Class: Mongo::Collection::View::ChangeStream
Relationships & Source Files | |
Namespace Children | |
Modules:
| |
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
self,
Aggregation
|
|
Instance Chain:
self,
Aggregation ,
Aggregation::Behavior ,
::Mongo::Retryable ,
::Mongo::Loggable ,
Explainable ,
Iterable ,
::Mongo::CursorHost ,
Immutable ,
Enumerable
|
|
Inherits: |
Mongo::Collection::View::Aggregation
|
Defined in: | lib/mongo/collection/view/change_stream.rb, lib/mongo/collection/view/change_stream/retryable.rb |
Overview
Only available in server versions 3.6 and higher.
ChangeStreams do not work properly with JRuby because of the issue documented here: github.com/jruby/jruby/issues/4212. Namely, JRuby eagerly evaluates #next on an Enumerator in a background green thread, therefore calling #next on the change stream will cause getMores to be called in a loop in the background.
Provides behavior around a $changeStream
pipeline stage in the aggregation framework. Specifying this stage allows users to request that notifications are sent for all changes to a particular collection or database.
Constant Summary
-
CLUSTER =
:cluster
-
DATABASE =
:database
-
FULL_DOCUMENT_DEFAULT =
'default'.freeze
Explainable
- Included
ALL_PLANS_EXECUTION, EXECUTION_STATS, QUERY_PLANNER
::Mongo::Loggable
- Included
Class Method Summary
-
.new(view, pipeline, changes_for, options = {}) ⇒ ChangeStream
constructor
Initialize the change stream for the provided collection view, pipeline and options.
Aggregation
- Inherited
.new | Initialize the aggregation for the provided collection view, pipeline and options. |
Instance Attribute Summary
-
#closed? ⇒ true, false
readonly
Is the change stream closed?
- #cursor ⇒ Cursor readonly Internal use only Internal use only
- #options ⇒ BSON::Document readonly
- #for_cluster? ⇒ Boolean readonly private
- #for_collection? ⇒ Boolean readonly private
- #for_database? ⇒ Boolean readonly private
- #resuming? ⇒ Boolean readonly private
Aggregation
- Inherited
Aggregation::Behavior
- Included
Explainable
- Included
Iterable
- Included
#prefer_cached_cursor? | If the caching cursor is closed and was not fully iterated, the documents we have in it are not the complete result set and we have no way of completing that iteration. |
#use_query_cache? |
::Mongo::CursorHost
- Included
#cursor | Returns the cursor associated with this view, if any. |
#timeout_mode |
Immutable
- Included
Instance Method Summary
-
#close(opts = {}) ⇒ nil
Close the change stream.
-
#cursor_type ⇒ Object
“change streams are an abstraction around tailable-awaitData cursors…”.
-
#each {|Each| ... } ⇒ Enumerator
Iterate through documents returned by the change stream.
-
#inspect ⇒ String
Get a formatted string for use in inspection.
-
#max_await_time_ms ⇒ Integer | nil
Returns the value of the max_await_time_ms option that was passed to this change stream.
-
#resume_token ⇒ BSON::Document | nil
Returns the resume token that the stream will use to automatically resume, if one exists.
-
#timeout_mode ⇒ Object
“change streams…implicitly use ITERATION mode”.
- #to_enum
-
#try_next ⇒ BSON::Document | nil
Return one document from the change stream, if one is available.
- #aggregate_spec(session, read_preference) private
- #change_doc private
- #create_cursor!(timeout_ms = nil) private
- #pipeline private
-
#recreate_cursor!(context = nil)
private
Recreates the current cursor (typically as a consequence of attempting to resume the change stream).
- #send_initial_query(connection, context) private
- #time_to_bson_timestamp(time) private
Aggregation
- Inherited
#effective_read_preference | Return effective read preference for the operation. |
#initial_query_op, #new, #send_initial_query |
Aggregation::Behavior
- Included
#allow_disk_use | Set to true if disk usage is allowed during the aggregation. |
#explain | Get the explain plan for the aggregation. |
#timeout_ms, #aggregate_spec, | |
#cache_options | Skip, sort, limit, projection are specified as pipeline stages rather than as options. |
#operation_timeouts, | |
#perform_setup | Common setup for all classes that include this behavior; the constructor should invoke this method. |
#server_selector |
::Mongo::Retryable
- Included
#read_worker | Returns the read worker for handling retryable reads. |
#select_server | This is a separate method to make it possible for the test suite to assert that server selection is performed during retry attempts. |
#write_worker | Returns the write worker for handling retryable writes. |
::Mongo::Loggable
- Included
#log_debug | Convenience method to log debug messages with the standard prefix. |
#log_error | Convenience method to log error messages with the standard prefix. |
#log_fatal | Convenience method to log fatal messages with the standard prefix. |
#log_info | Convenience method to log info messages with the standard prefix. |
#log_warn | Convenience method to log warn messages with the standard prefix. |
#logger | Get the logger instance. |
#_mongo_log_prefix, #format_message |
Explainable
- Included
#explain | Get the query plan for the query. |
#explain_options |
Iterable
- Included
#close_query | Cleans up resources associated with this query. |
#each | Iterate through documents returned by a query with this |
#kill_cursors | Alias for Iterable#close_query. |
#cache_options, #cached_cursor, #compute_limit_for_cached_query, #initial_query_op, | |
#maybe_set_tailable_options | Add tailable cusror options to the command specifiction if needed. |
#new_cursor_for_iteration | Start a new cursor for use when iterating (via #each). |
#oplog_replay, #select_cursor, #send_initial_query |
::Mongo::CursorHost
- Included
#validate_timeout_mode! | Ensure the timeout mode is appropriate for other options that have been given. |
Immutable
- Included
Constructor Details
.new(view, pipeline, changes_for, options = {}) ⇒ ChangeStream
Initialize the change stream for the provided collection view, pipeline and options.
# File 'lib/mongo/collection/view/change_stream.rb', line 133
def initialize(view, pipeline, changes_for, = {}) # change stream cursors can only be :iterable, so we don't allow # timeout_mode to be specified. perform_setup(view, , forbid: %i[ timeout_mode ]) do @changes_for = changes_for @change_stream_filters = pipeline && pipeline.dup @start_after = @options[:start_after] end # The resume token tracked by the change stream, used only # when there is no cursor, or no cursor resume token @resume_token = @start_after || @options[:resume_after] create_cursor! # We send different parameters when we resume a change stream # compared to when we send the first query @resuming = true end
Instance Attribute Details
#closed? ⇒ true
, false
(readonly)
Is the change stream closed?
# File 'lib/mongo/collection/view/change_stream.rb', line 273
def closed? @cursor.nil? end
#cursor ⇒ Cursor (readonly)
# File 'lib/mongo/collection/view/change_stream.rb', line 67
attr_reader :cursor
#for_cluster? ⇒ Boolean
(readonly, private)
# File 'lib/mongo/collection/view/change_stream.rb', line 328
def for_cluster? @changes_for == CLUSTER end
#for_collection? ⇒ Boolean
(readonly, private)
# File 'lib/mongo/collection/view/change_stream.rb', line 336
def for_collection? !for_cluster? && !for_database? end
#for_database? ⇒ Boolean
(readonly, private)
# File 'lib/mongo/collection/view/change_stream.rb', line 332
def for_database? @changes_for == DATABASE end
#options ⇒ BSON::Document
(readonly)
# File 'lib/mongo/collection/view/change_stream.rb', line 63
attr_reader :
#resuming? ⇒ Boolean
(readonly, private)
# File 'lib/mongo/collection/view/change_stream.rb', line 456
def resuming? !!@resuming end
Instance Method Details
#aggregate_spec(session, read_preference) (private)
# File 'lib/mongo/collection/view/change_stream.rb', line 379
def aggregate_spec(session, read_preference) super(session, read_preference).tap do |spec| spec[:selector][:aggregate] = 1 unless for_collection? end end
#change_doc (private)
# File 'lib/mongo/collection/view/change_stream.rb', line 385
def change_doc {}.tap do |doc| if @options[:full_document] doc[:fullDocument] = @options[:full_document] end if @options[:full_document_before_change] doc[:fullDocumentBeforeChange] = @options[:full_document_before_change] end if @options.key?(: ) doc[:showExpandedEvents] = @options[: ] end if resuming? # We have a resume token once we retrieved any documents. # However, if the first getMore fails and the user didn't pass # a resume token we won't have a resume token to use. # Use start_at_operation time in this case if resume_token # Spec says we need to remove both startAtOperationTime and startAfter if # either was passed in by user, thus we won't forward them doc[:resumeAfter] = resume_token elsif @start_at_operation_time_supported && @start_at_operation_time # It is crucial to check @start_at_operation_time_supported # here - we may have switched to an older server that # does not support operation times and therefore shouldn't # try to send one to it! # # @start_at_operation_time is already a BSON::Timestamp doc[:startAtOperationTime] = @start_at_operation_time else # Can't resume if we don't have either raise Mongo::Error::MissingResumeToken end else if @start_after doc[:startAfter] = @start_after elsif resume_token doc[:resumeAfter] = resume_token end if [:start_at_operation_time] doc[:startAtOperationTime] = ( [:start_at_operation_time]) end end doc[:allChangesForCluster] = true if for_cluster? end end
#close(opts = {}) ⇒ nil
This method attempts to close the cursor used by the change stream, which in turn closes the server-side change stream cursor. This method ignores any errors that occur when closing the server-side cursor.
Close the change stream.
# File 'lib/mongo/collection/view/change_stream.rb', line 254
def close(opts = {}) unless closed? begin @cursor.close(opts) rescue Error::OperationFailure::Family, Error::SocketError, Error::SocketTimeoutError, Error::MissingConnection # ignore end @cursor = nil end end
#create_cursor!(timeout_ms = nil) (private)
# File 'lib/mongo/collection/view/change_stream.rb', line 340
def create_cursor!(timeout_ms = nil) # clear the cache because we may get a newer or an older server # (rolling upgrades) @start_at_operation_time_supported = nil session = client.get_session(@options) context = Operation::Context.new(client: client, session: session, view: self, operation_timeouts: timeout_ms ? { operation_timeout_ms: timeout_ms } : operation_timeouts) start_at_operation_time = nil start_at_operation_time_supported = nil @cursor = read_with_retry_cursor(session, server_selector, self, context: context) do |server| server.with_connection do |connection| start_at_operation_time_supported = connection.description.server_version_gte?('4.0') result = send_initial_query(connection, context) if doc = result.replies.first && result.replies.first.documents.first start_at_operation_time = doc['operationTime'] else # The above may set @start_at_operation_time to nil # if it was not in the document for some reason, # for consistency set it to nil here as well. # NB: since this block may be executed more than once, each # execution must write to start_at_operation_time either way. start_at_operation_time = nil end result end end @start_at_operation_time = start_at_operation_time @start_at_operation_time_supported = start_at_operation_time_supported end
#cursor_type ⇒ Object
“change streams are an abstraction around tailable-awaitData cursors…”
# File 'lib/mongo/collection/view/change_stream.rb', line 307
def cursor_type :tailable_await end
#each {|Each| ... } ⇒ Enumerator
Iterate through documents returned by the change stream.
This method retries once per error on resumable errors (two consecutive errors result in the second error being raised, an error which is recovered from resets the error count to zero).
#inspect ⇒ String
Get a formatted string for use in inspection.
# File 'lib/mongo/collection/view/change_stream.rb', line 285
def inspect "#<Mongo::Collection::View:ChangeStream:0x#{object_id} filters=#{@change_stream_filters} " + "options=#{@options} resume_token=#{resume_token}>" end
#max_await_time_ms ⇒ Integer
| nil
Returns the value of the max_await_time_ms option that was passed to this change stream.
# File 'lib/mongo/collection/view/change_stream.rb', line 322
def max_await_time_ms [:max_await_time_ms] end
#pipeline (private)
# File 'lib/mongo/collection/view/change_stream.rb', line 375
def pipeline [{ '$changeStream' => change_doc }] + @change_stream_filters end
#recreate_cursor!(context = nil) (private)
Recreates the current cursor (typically as a consequence of attempting to resume the change stream)
# File 'lib/mongo/collection/view/change_stream.rb', line 462
def recreate_cursor!(context = nil) @timed_out = false close create_cursor!(context&.remaining_timeout_ms) end
#resume_token ⇒ BSON::Document
| nil
Returns the resume token that the stream will use to automatically resume, if one exists.
# File 'lib/mongo/collection/view/change_stream.rb', line 299
def resume_token cursor_resume_token = @cursor.resume_token if @cursor cursor_resume_token || @resume_token end
#send_initial_query(connection, context) (private)
# File 'lib/mongo/collection/view/change_stream.rb', line 437
def send_initial_query(connection, context) initial_query_op(context.session, view.read_preference) .execute_with_connection( connection, context: context, ) end
#time_to_bson_timestamp(time) (private)
# File 'lib/mongo/collection/view/change_stream.rb', line 445
def (time) if time.is_a?(Time) seconds = time.to_f BSON::Timestamp.new(seconds.to_i, ((seconds - seconds.to_i) * 1000000).to_i) elsif time.is_a?(BSON::Timestamp) time else raise ArgumentError, 'Time must be a Time or a BSON::Timestamp instance' end end
#timeout_mode ⇒ Object
“change streams…implicitly use ITERATION mode”
# File 'lib/mongo/collection/view/change_stream.rb', line 314
def timeout_mode :iteration end
#to_enum
#try_next ⇒ BSON::Document
| nil
Return one document from the change stream, if one is available.
Retries once on a resumable error.
Raises StopIteration if the change stream is closed.
This method will wait up to max_await_time_ms milliseconds for changes from the server, and if no changes are received it will return nil.
# File 'lib/mongo/collection/view/change_stream.rb', line 191
def try_next recreate_cursor! if @timed_out raise StopIteration.new if closed? begin doc = @cursor.try_next rescue Mongo::Error => e # "If a next call fails with a timeout error, drivers MUST NOT # invalidate the change stream. The subsequent next call MUST # perform a resume attempt to establish a new change stream on the # server..." # # However, SocketTimeoutErrors are TimeoutErrors, but are also # change-stream-resumable. To preserve existing (specified) behavior, # We only count timeouts when the error is not also # change-stream-resumable. @timed_out = e.is_a?(Mongo::Error::TimeoutError) && !e.change_stream_resumable? raise unless @timed_out || e.change_stream_resumable? @resume_token = @cursor.resume_token raise e if @timed_out recreate_cursor!(@cursor.context) retry end # We need to verify each doc has an _id, so we # have a resume token to work with if doc && doc['_id'].nil? raise Error::MissingResumeToken end doc end