123456789_123456789_123456789_123456789_123456789_

Class: Mongo::Collection::View::ChangeStream

Relationships & Source Files
Namespace Children
Modules:
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
self, Aggregation
Instance Chain:
Inherits: Mongo::Collection::View::Aggregation
Defined in: lib/mongo/collection/view/change_stream.rb,
lib/mongo/collection/view/change_stream/retryable.rb

Overview

Note:

Only available in server versions 3.6 and higher.

Note:

ChangeStreams do not work properly with JRuby because of the issue documented here: github.com/jruby/jruby/issues/4212. Namely, JRuby eagerly evaluates #next on an Enumerator in a background green thread, therefore calling #next on the change stream will cause getMores to be called in a loop in the background.

Provides behavior around a $changeStream pipeline stage in the aggregation framework. Specifying this stage allows users to request that notifications are sent for all changes to a particular collection or database.

Since:

  • 2.5.0

Constant Summary

Explainable - Included

ALL_PLANS_EXECUTION, EXECUTION_STATS, QUERY_PLANNER

::Mongo::Loggable - Included

PREFIX

Class Method Summary

Aggregation - Inherited

.new

Initialize the aggregation for the provided collection view, pipeline and options.

Instance Attribute Summary

Aggregation - Inherited

Aggregation::Behavior - Included

#view,
#write?

Whether this aggregation will write its result to a database collection.

Explainable - Included

Iterable - Included

#prefer_cached_cursor?

If the caching cursor is closed and was not fully iterated, the documents we have in it are not the complete result set and we have no way of completing that iteration.

#use_query_cache?

::Mongo::CursorHost - Included

#cursor

Returns the cursor associated with this view, if any.

#timeout_mode

Immutable - Included

Instance Method Summary

Aggregation - Inherited

#effective_read_preference

Return effective read preference for the operation.

#initial_query_op, #new, #send_initial_query

Aggregation::Behavior - Included

#allow_disk_use

Set to true if disk usage is allowed during the aggregation.

#explain

Get the explain plan for the aggregation.

#timeout_ms, #aggregate_spec,
#cache_options

Skip, sort, limit, projection are specified as pipeline stages rather than as options.

#operation_timeouts,
#perform_setup

Common setup for all classes that include this behavior; the constructor should invoke this method.

#server_selector

::Mongo::Retryable - Included

#read_worker

Returns the read worker for handling retryable reads.

#select_server

This is a separate method to make it possible for the test suite to assert that server selection is performed during retry attempts.

#write_worker

Returns the write worker for handling retryable writes.

::Mongo::Loggable - Included

#log_debug

Convenience method to log debug messages with the standard prefix.

#log_error

Convenience method to log error messages with the standard prefix.

#log_fatal

Convenience method to log fatal messages with the standard prefix.

#log_info

Convenience method to log info messages with the standard prefix.

#log_warn

Convenience method to log warn messages with the standard prefix.

#logger

Get the logger instance.

#_mongo_log_prefix, #format_message

Explainable - Included

#explain

Get the query plan for the query.

#explain_options

Iterable - Included

#close_query

Cleans up resources associated with this query.

#each

Iterate through documents returned by a query with this ::Mongo::Collection::View.

#kill_cursors
#cache_options, #cached_cursor, #compute_limit_for_cached_query, #initial_query_op,
#maybe_set_tailable_options

Add tailable cusror options to the command specifiction if needed.

#new_cursor_for_iteration

Start a new cursor for use when iterating (via #each).

#oplog_replay, #select_cursor, #send_initial_query

::Mongo::CursorHost - Included

#validate_timeout_mode!

Ensure the timeout mode is appropriate for other options that have been given.

Immutable - Included

Constructor Details

.new(view, pipeline, changes_for, options = {}) ⇒ ChangeStream

Initialize the change stream for the provided collection view, pipeline and options.

Examples:

Create the new change stream view.

ChangeStream.new(view, pipeline, options)

Parameters:

  • view (Collection::View)

    The collection view.

  • pipeline (Array<Hash>)

    The pipeline of operators to filter the change notifications.

  • options (Hash) (defaults to: {})

    The change stream options.

Options Hash (options):

  • :full_document (String)

    Allowed values: nil, ‘default’, ‘updateLookup’, ‘whenAvailable’, ‘required’.

    The default is to not send a value (i.e. nil), which is equivalent to ‘default’. By default, the change notification for partial updates will include a delta describing the changes to the document.

    When set to ‘updateLookup’, the change notification for partial updates will include both a delta describing the changes to the document as well as a copy of the entire document that was changed from some time after the change occurred.

    When set to ‘whenAvailable’, configures the change stream to return the post-image of the modified document for replace and update change events if the post-image for this event is available.

    When set to ‘required’, the same behavior as ‘whenAvailable’ except that an error is raised if the post-image is not available.

  • :full_document_before_change (String)

    Allowed values: nil, ‘whenAvailable’, ‘required’, ‘off’.

    The default is to not send a value (i.e. nil), which is equivalent to ‘off’.

    When set to ‘whenAvailable’, configures the change stream to return the pre-image of the modified document for replace, update, and delete change events if it is available.

    When set to ‘required’, the same behavior as ‘whenAvailable’ except that an error is raised if the pre-image is not available.

  • :resume_after (BSON::Document, Hash)

    Specifies the logical starting point for the new change stream.

  • :max_await_time_ms (Integer)

    The maximum amount of time for the server to wait on new documents to satisfy a change stream query.

  • :batch_size (Integer)

    The number of documents to return per batch.

  • :collation (BSON::Document, Hash)

    The collation to use.

  • :start_at_operation_time (BSON::Timestamp)

    Only return changes that occurred at or after the specified timestamp. Any command run against the server will return a cluster time that can be used here. Only recognized by server versions 4.0+.

  • :start_after (Bson::Document, Hash)

    Similar to :resume_after, this option takes a resume token and starts a new change stream returning the first notification after the token. This will allow users to watch collections that have been dropped and recreated or newly renamed collections without missing any notifications.

  • :comment (Object)

    A user-provided comment to attach to this command.

  • :show_expanded_events (Boolean)

    Enables the server to send the ‘expanded’ list of change stream events. The list of additional events included with this flag set are: createIndexes, dropIndexes, modify, create, shardCollection, reshardCollection, refineCollectionShardKey.

    The server will report an error if startAfter and resumeAfter are both specified.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 133

def initialize(view, pipeline, changes_for, options = {})
  # change stream cursors can only be :iterable, so we don't allow
  # timeout_mode to be specified.
  perform_setup(view, options, forbid: %i[ timeout_mode ]) do
    @changes_for = changes_for
    @change_stream_filters = pipeline && pipeline.dup
    @start_after = @options[:start_after]
  end

  # The resume token tracked by the change stream, used only
  # when there is no cursor, or no cursor resume token
  @resume_token = @start_after || @options[:resume_after]

  create_cursor!

  # We send different parameters when we resume a change stream
  # compared to when we send the first query
  @resuming = true
end

Instance Attribute Details

#closed?true, false (readonly)

Is the change stream closed?

Examples:

Determine whether the change stream is closed.

stream.closed?

Returns:

  • (true, false)

    If the change stream is closed.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 273

def closed?
  @cursor.nil?
end

#cursorCursor (readonly)

This method is for internal use only.

Returns:

  • (Cursor)

    the underlying cursor for this operation

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 67

attr_reader :cursor

#for_cluster?Boolean (readonly, private)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 328

def for_cluster?
  @changes_for == CLUSTER
end

#for_collection?Boolean (readonly, private)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 336

def for_collection?
  !for_cluster? && !for_database?
end

#for_database?Boolean (readonly, private)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 332

def for_database?
  @changes_for == DATABASE
end

#optionsBSON::Document (readonly)

Returns:

  • (BSON::Document)

    The change stream options.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 63

attr_reader :options

#resuming?Boolean (readonly, private)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 456

def resuming?
  !!@resuming
end

Instance Method Details

#aggregate_spec(session, read_preference) (private)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 379

def aggregate_spec(session, read_preference)
  super(session, read_preference).tap do |spec|
    spec[:selector][:aggregate] = 1 unless for_collection?
  end
end

#change_doc (private)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 385

def change_doc
  {}.tap do |doc|
    if @options[:full_document]
      doc[:fullDocument] = @options[:full_document]
    end

    if @options[:full_document_before_change]
      doc[:fullDocumentBeforeChange] = @options[:full_document_before_change]
    end

    if @options.key?(:show_expanded_events)
      doc[:showExpandedEvents] = @options[:show_expanded_events]
    end

    if resuming?
      # We have a resume token once we retrieved any documents.
      # However, if the first getMore fails and the user didn't pass
      # a resume token we won't have a resume token to use.
      # Use start_at_operation time in this case
      if resume_token
        # Spec says we need to remove both startAtOperationTime and startAfter if
        # either was passed in by user, thus we won't forward them
        doc[:resumeAfter] = resume_token
      elsif @start_at_operation_time_supported && @start_at_operation_time
        # It is crucial to check @start_at_operation_time_supported
        # here - we may have switched to an older server that
        # does not support operation times and therefore shouldn't
        # try to send one to it!
        #
        # @start_at_operation_time is already a BSON::Timestamp
        doc[:startAtOperationTime] = @start_at_operation_time
      else
        # Can't resume if we don't have either
        raise Mongo::Error::MissingResumeToken
      end
    else
      if @start_after
        doc[:startAfter] = @start_after
      elsif resume_token
        doc[:resumeAfter] = resume_token
      end

      if options[:start_at_operation_time]
        doc[:startAtOperationTime] = time_to_bson_timestamp(
          options[:start_at_operation_time])
      end
    end

    doc[:allChangesForCluster] = true if for_cluster?
  end
end

#close(opts = {}) ⇒ nil

Note:

This method attempts to close the cursor used by the change stream, which in turn closes the server-side change stream cursor. This method ignores any errors that occur when closing the server-side cursor.

Close the change stream.

Examples:

Close the change stream.

stream.close

Returns:

  • (nil)

    Always nil.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 254

def close(opts = {})
  unless closed?
    begin
      @cursor.close(opts)
    rescue Error::OperationFailure::Family, Error::SocketError, Error::SocketTimeoutError, Error::MissingConnection
      # ignore
    end
    @cursor = nil
  end
end

#create_cursor!(timeout_ms = nil) (private)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 340

def create_cursor!(timeout_ms = nil)
  # clear the cache because we may get a newer or an older server
  # (rolling upgrades)
  @start_at_operation_time_supported = nil

  session = client.get_session(@options)
  context = Operation::Context.new(client: client, session: session, view: self, operation_timeouts: timeout_ms ? { operation_timeout_ms: timeout_ms } : operation_timeouts)

  start_at_operation_time = nil
  start_at_operation_time_supported = nil

  @cursor = read_with_retry_cursor(session, server_selector, self, context: context) do |server|
    server.with_connection do |connection|
      start_at_operation_time_supported = connection.description.server_version_gte?('4.0')

      result = send_initial_query(connection, context)

      if doc = result.replies.first && result.replies.first.documents.first
        start_at_operation_time = doc['operationTime']
      else
        # The above may set @start_at_operation_time to nil
        # if it was not in the document for some reason,
        # for consistency set it to nil here as well.
        # NB: since this block may be executed more than once, each
        # execution must write to start_at_operation_time either way.
        start_at_operation_time = nil
      end
      result
    end
  end

  @start_at_operation_time = start_at_operation_time
  @start_at_operation_time_supported = start_at_operation_time_supported
end

#cursor_typeObject

“change streams are an abstraction around tailable-awaitData cursors…”

Returns:

  • :tailable_await

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 307

def cursor_type
  :tailable_await
end

#each {|Each| ... } ⇒ Enumerator

Iterate through documents returned by the change stream.

This method retries once per error on resumable errors (two consecutive errors result in the second error being raised, an error which is recovered from resets the error count to zero).

Examples:

Iterate through the stream of documents.

stream.each do |document|
  p document
end

Yield Parameters:

  • Each (BSON::Document)

    change stream document.

Returns:

  • (Enumerator)

    The enumerator.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 169

def each
  raise StopIteration.new if closed?
  loop do
    document = try_next
    yield document if document
  end
rescue StopIteration
  return self
end

#inspectString

Get a formatted string for use in inspection.

Examples:

Inspect the change stream object.

stream.inspect

Returns:

  • (String)

    The change stream inspection.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 285

def inspect
  "#<Mongo::Collection::View:ChangeStream:0x#{object_id} filters=#{@change_stream_filters} " +
    "options=#{@options} resume_token=#{resume_token}>"
end

#max_await_time_msInteger | nil

Returns the value of the max_await_time_ms option that was passed to this change stream.

Returns:

  • (Integer | nil)

    the max_await_time_ms value

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 322

def max_await_time_ms
  options[:max_await_time_ms]
end

#pipeline (private)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 375

def pipeline
  [{ '$changeStream' => change_doc }] + @change_stream_filters
end

#recreate_cursor!(context = nil) (private)

Recreates the current cursor (typically as a consequence of attempting to resume the change stream)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 462

def recreate_cursor!(context = nil)
  @timed_out = false

  close
  create_cursor!(context&.remaining_timeout_ms)
end

#resume_tokenBSON::Document | nil

Returns the resume token that the stream will use to automatically resume, if one exists.

Examples:

Get the change stream resume token.

stream.resume_token

Returns:

  • (BSON::Document | nil)

    The change stream resume token.

Since:

  • 2.10.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 299

def resume_token
  cursor_resume_token = @cursor.resume_token if @cursor
  cursor_resume_token || @resume_token
end

#send_initial_query(connection, context) (private)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 437

def send_initial_query(connection, context)
  initial_query_op(context.session, view.read_preference)
    .execute_with_connection(
      connection,
      context: context,
    )
end

#time_to_bson_timestamp(time) (private)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 445

def time_to_bson_timestamp(time)
  if time.is_a?(Time)
    seconds = time.to_f
    BSON::Timestamp.new(seconds.to_i, ((seconds - seconds.to_i) * 1000000).to_i)
  elsif time.is_a?(BSON::Timestamp)
    time
  else
    raise ArgumentError, 'Time must be a Time or a BSON::Timestamp instance'
  end
end

#timeout_modeObject

“change streams…implicitly use ITERATION mode”

Returns:

  • :iteration

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 314

def timeout_mode
  :iteration
end

#to_enum

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 227

def to_enum
  enum = super
  enum.send(:instance_variable_set, '@obj', self)
  class << enum
    def try_next
      @obj.try_next
    end
  end
  enum
end

#try_nextBSON::Document | nil

Return one document from the change stream, if one is available.

Retries once on a resumable error.

Raises StopIteration if the change stream is closed.

This method will wait up to max_await_time_ms milliseconds for changes from the server, and if no changes are received it will return nil.

Returns:

  • (BSON::Document | nil)

    A change stream document.

Raises:

  • (StopIteration)

Since:

  • 2.6.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 191

def try_next
  recreate_cursor! if @timed_out

  raise StopIteration.new if closed?

  begin
    doc = @cursor.try_next
  rescue Mongo::Error => e
    # "If a next call fails with a timeout error, drivers MUST NOT
    # invalidate the change stream. The subsequent next call MUST
    # perform a resume attempt to establish a new change stream on the
    # server..."
    #
    # However, SocketTimeoutErrors are TimeoutErrors, but are also
    # change-stream-resumable. To preserve existing (specified) behavior,
    # We only count timeouts when the error is not also
    # change-stream-resumable.
    @timed_out = e.is_a?(Mongo::Error::TimeoutError) && !e.change_stream_resumable?

    raise unless @timed_out || e.change_stream_resumable?

    @resume_token = @cursor.resume_token
    raise e if @timed_out

    recreate_cursor!(@cursor.context)
    retry
  end

  # We need to verify each doc has an _id, so we
  # have a resume token to work with
  if doc && doc['_id'].nil?
    raise Error::MissingResumeToken
  end
  doc
end