123456789_123456789_123456789_123456789_123456789_

Class: Mongo::Collection::View::ChangeStream

Relationships & Source Files
Namespace Children
Modules:
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
self, Aggregation, Forwardable
Instance Chain:
Inherits: Mongo::Collection::View::Aggregation
Defined in: lib/mongo/collection/view/change_stream.rb,
lib/mongo/collection/view/change_stream/retryable.rb

Overview

Note:

Only available in server versions 3.6 and higher.

Note:

ChangeStreams do not work properly with JRuby because of the issue documented here: github.com/jruby/jruby/issues/4212. Namely, JRuby eagerly evaluates #next on an Enumerator in a background green thread, therefore calling #next on the change stream will cause getMores to be called in a loop in the background.

Provides behavior around a ‘$changeStream` pipeline stage in the aggregation framework. Specifying this stage allows users to request that notifications are sent for all changes to a particular collection or database.

Since:

  • 2.5.0

Constant Summary

Explainable - Included

ALL_PLANS_EXECUTION, EXECUTION_STATS, QUERY_PLANNER

::Mongo::Loggable - Included

PREFIX

Aggregation - Inherited

REROUTE

Class Method Summary

Aggregation - Inherited

.new

Initialize the aggregation for the provided collection view, pipeline and options.

Instance Attribute Summary

Aggregation - Inherited

#pipeline, #view,
#write?

Whether this aggregation will write its result to a database collection.

Explainable - Included

Iterable - Included

#cursor

Returns the cursor associated with this view, if any.

#use_query_cache?

Immutable - Included

Instance Method Summary

Aggregation - Inherited

#allow_disk_use

Set to true if disk usage is allowed during the aggregation.

#explain

Get the explain plan for the aggregation.

#aggregate_spec,
#cache_options

Skip, sort, limit, projection are specified as pipeline stages rather than as options.

#effective_read_preference

Return effective read preference for the operation.

#initial_query_op, #new, #send_initial_query, #server_selector

::Mongo::Retryable - Included

#read_worker

Returns the read worker for handling retryable reads.

#select_server

This is a separate method to make it possible for the test suite to assert that server selection is performed during retry attempts.

#write_worker

Returns the write worker for handling retryable writes.

::Mongo::Loggable - Included

#log_debug

Convenience method to log debug messages with the standard prefix.

#log_error

Convenience method to log error messages with the standard prefix.

#log_fatal

Convenience method to log fatal messages with the standard prefix.

#log_info

Convenience method to log info messages with the standard prefix.

#log_warn

Convenience method to log warn messages with the standard prefix.

#logger

Get the logger instance.

#_mongo_log_prefix, #format_message

Explainable - Included

#explain

Get the query plan for the query.

#explain_options

Iterable - Included

#close_query

Cleans up resources associated with this query.

#each

Iterate through documents returned by a query with this View.

#kill_cursors
#cache_options, #cached_cursor, #initial_query_op,
#maybe_set_tailable_options

Add tailable cusror options to the command specifiction if needed.

#select_cursor, #send_initial_query

Immutable - Included

Constructor Details

.new(view, pipeline, changes_for, options = {}) ⇒ ChangeStream

Initialize the change stream for the provided collection view, pipeline and options.

Examples:

Create the new change stream view.

ChangeStream.new(view, pipeline, options)

Parameters:

  • view (Collection::View)

    The collection view.

  • pipeline (Array<Hash>)

    The pipeline of operators to filter the change notifications.

  • options (Hash) (defaults to: {})

    The change stream options.

Options Hash (options):

  • :full_document (String)

    Allowed values: nil, ‘default’, ‘updateLookup’, ‘whenAvailable’, ‘required’.

    The default is to not send a value (i.e. nil), which is equivalent to ‘default’. By default, the change notification for partial updates will include a delta describing the changes to the document.

    When set to ‘updateLookup’, the change notification for partial updates will include both a delta describing the changes to the document as well as a copy of the entire document that was changed from some time after the change occurred.

    When set to ‘whenAvailable’, configures the change stream to return the post-image of the modified document for replace and update change events if the post-image for this event is available.

    When set to ‘required’, the same behavior as ‘whenAvailable’ except that an error is raised if the post-image is not available.

  • :full_document_before_change (String)

    Allowed values: nil, ‘whenAvailable’, ‘required’, ‘off’.

    The default is to not send a value (i.e. nil), which is equivalent to ‘off’.

    When set to ‘whenAvailable’, configures the change stream to return the pre-image of the modified document for replace, update, and delete change events if it is available.

    When set to ‘required’, the same behavior as ‘whenAvailable’ except that an error is raised if the pre-image is not available.

  • :resume_after (BSON::Document, Hash)

    Specifies the logical starting point for the new change stream.

  • :max_await_time_ms (Integer)

    The maximum amount of time for the server to wait on new documents to satisfy a change stream query.

  • :batch_size (Integer)

    The number of documents to return per batch.

  • :collation (BSON::Document, Hash)

    The collation to use.

  • :start_at_operation_time (BSON::Timestamp)

    Only return changes that occurred at or after the specified timestamp. Any command run against the server will return a cluster time that can be used here. Only recognized by server versions 4.0+.

  • :start_after (Bson::Document, Hash)

    Similar to :resume_after, this option takes a resume token and starts a new change stream returning the first notification after the token. This will allow users to watch collections that have been dropped and recreated or newly renamed collections without missing any notifications.

  • :comment (Object)

    A user-provided comment to attach to this command.

  • :show_expanded_events (Boolean)

    Enables the server to send the ‘expanded’ list of change stream events. The list of additional events included with this flag set are: createIndexes, dropIndexes, modify, create, shardCollection, reshardCollection, refineCollectionShardKey.

    The server will report an error if ‘startAfter` and resumeAfter are both specified.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 127

def initialize(view, pipeline, changes_for, options = {})
  @view = view
  @changes_for = changes_for
  @change_stream_filters = pipeline && pipeline.dup
  @options = options && options.dup.freeze
  @start_after = @options[:start_after]

  # The resume token tracked by the change stream, used only
  # when there is no cursor, or no cursor resume token
  @resume_token = @start_after || @options[:resume_after]

  create_cursor!

  # We send different parameters when we resume a change stream
  # compared to when we send the first query
  @resuming = true
end

Instance Attribute Details

#closed?true, false (readonly)

Is the change stream closed?

Examples:

Determine whether the change stream is closed.

stream.closed?

Returns:

  • (true, false)

    If the change stream is closed.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 256

def closed?
  @cursor.nil?
end

#for_cluster?Boolean (readonly, private)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 289

def for_cluster?
  @changes_for == CLUSTER
end

#for_collection?Boolean (readonly, private)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 297

def for_collection?
  !for_cluster? && !for_database?
end

#for_database?Boolean (readonly, private)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 293

def for_database?
  @changes_for == DATABASE
end

#optionsBSON::Document (readonly)

Returns:

  • (BSON::Document)

    The change stream options.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 61

attr_reader :options

#resuming?Boolean (readonly, private)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 412

def resuming?
  !!@resuming
end

Instance Method Details

#aggregate_spec(session, read_preference) (private)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 335

def aggregate_spec(session, read_preference)
  super(session, read_preference).tap do |spec|
    spec[:selector][:aggregate] = 1 unless for_collection?
  end
end

#change_doc (private)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 341

def change_doc
  {}.tap do |doc|
    if @options[:full_document]
      doc[:fullDocument] = @options[:full_document]
    end

    if @options[:full_document_before_change]
      doc[:fullDocumentBeforeChange] = @options[:full_document_before_change]
    end

    if @options.key?(:show_expanded_events)
      doc[:showExpandedEvents] = @options[:show_expanded_events]
    end

    if resuming?
      # We have a resume token once we retrieved any documents.
      # However, if the first getMore fails and the user didn't pass
      # a resume token we won't have a resume token to use.
      # Use start_at_operation time in this case
      if resume_token
        # Spec says we need to remove both startAtOperationTime and startAfter if
        # either was passed in by user, thus we won't forward them
        doc[:resumeAfter] = resume_token
      elsif @start_at_operation_time_supported && @start_at_operation_time
        # It is crucial to check @start_at_operation_time_supported
        # here - we may have switched to an older server that
        # does not support operation times and therefore shouldn't
        # try to send one to it!
        #
        # @start_at_operation_time is already a BSON::Timestamp
        doc[:startAtOperationTime] = @start_at_operation_time
      else
        # Can't resume if we don't have either
        raise Mongo::Error::MissingResumeToken
      end
    else
      if @start_after
        doc[:startAfter] = @start_after
      elsif resume_token
        doc[:resumeAfter] = resume_token
      end

      if options[:start_at_operation_time]
        doc[:startAtOperationTime] = time_to_bson_timestamp(
          options[:start_at_operation_time])
      end
    end

    doc[:allChangesForCluster] = true if for_cluster?
  end
end

#closenil

Note:

This method attempts to close the cursor used by the change stream, which in turn closes the server-side change stream cursor. This method ignores any errors that occur when closing the server-side cursor.

Close the change stream.

Examples:

Close the change stream.

stream.close

Returns:

  • (nil)

    Always nil.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 237

def close
  unless closed?
    begin
      @cursor.close
    rescue Error::OperationFailure, Error::SocketError, Error::SocketTimeoutError, Error::MissingConnection
      # ignore
    end
    @cursor = nil
  end
end

#create_cursor! (private)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 301

def create_cursor!
  # clear the cache because we may get a newer or an older server
  # (rolling upgrades)
  @start_at_operation_time_supported = nil

  session = client.send(:get_session, @options)
  start_at_operation_time = nil
  start_at_operation_time_supported = nil
  @cursor = read_with_retry_cursor(session, server_selector, view) do |server|
    server.with_connection do |connection|
      start_at_operation_time_supported = connection.description.server_version_gte?('4.0')

      result = send_initial_query(connection, session)
      if doc = result.replies.first && result.replies.first.documents.first
        start_at_operation_time = doc['operationTime']
      else
        # The above may set @start_at_operation_time to nil
        # if it was not in the document for some reason,
        # for consistency set it to nil here as well.
        # NB: since this block may be executed more than once, each
        # execution must write to start_at_operation_time either way.
        start_at_operation_time = nil
      end
      result
    end
  end
  @start_at_operation_time = start_at_operation_time
  @start_at_operation_time_supported = start_at_operation_time_supported
end

#each {|Each| ... } ⇒ Enumerator

Iterate through documents returned by the change stream.

This method retries once per error on resumable errors (two consecutive errors result in the second error being raised, an error which is recovered from resets the error count to zero).

Examples:

Iterate through the stream of documents.

stream.each do |document|
  p document
end

Yield Parameters:

  • Each (BSON::Document)

    change stream document.

Returns:

  • (Enumerator)

    The enumerator.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 161

def each
  raise StopIteration.new if closed?
  loop do
    document = try_next
    yield document if document
  end
rescue StopIteration
  return self
end

#inspectString

Get a formatted string for use in inspection.

Examples:

Inspect the change stream object.

stream.inspect

Returns:

  • (String)

    The change stream inspection.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 268

def inspect
  "#<Mongo::Collection::View:ChangeStream:0x#{object_id} filters=#{@change_stream_filters} " +
    "options=#{@options} resume_token=#{resume_token}>"
end

#pipeline (private)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 331

def pipeline
  [{ '$changeStream' => change_doc }] + @change_stream_filters
end

#resume_tokenBSON::Document | nil

Returns the resume token that the stream will use to automatically resume, if one exists.

Examples:

Get the change stream resume token.

stream.resume_token

Returns:

  • (BSON::Document | nil)

    The change stream resume token.

Since:

  • 2.10.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 282

def resume_token
  cursor_resume_token = @cursor.resume_token if @cursor
  cursor_resume_token || @resume_token
end

#send_initial_query(connection, session) (private)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 393

def send_initial_query(connection, session)
  initial_query_op(session, view.read_preference)
    .execute_with_connection(
      connection,
      context: Operation::Context.new(client: client, session: session),
    )
end

#time_to_bson_timestamp(time) (private)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 401

def time_to_bson_timestamp(time)
  if time.is_a?(Time)
    seconds = time.to_f
    BSON::Timestamp.new(seconds.to_i, ((seconds - seconds.to_i) * 1000000).to_i)
  elsif time.is_a?(BSON::Timestamp)
    time
  else
    raise ArgumentError, 'Time must be a Time or a BSON::Timestamp instance'
  end
end

#to_enum

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 213

def to_enum
  enum = super
  enum.send(:instance_variable_set, '@obj', self)
  class << enum
    def try_next
      @obj.try_next
    end
  end
  enum
end

#try_nextBSON::Document | nil

Return one document from the change stream, if one is available.

Retries once on a resumable error.

Raises StopIteration if the change stream is closed.

This method will wait up to max_await_time_ms milliseconds for changes from the server, and if no changes are received it will return nil.

Returns:

  • (BSON::Document | nil)

    A change stream document.

Raises:

  • (StopIteration)

Since:

  • 2.6.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/change_stream.rb', line 183

def try_next
  raise StopIteration.new if closed?
  begin
    doc = @cursor.try_next
  rescue Mongo::Error => e
    if !e.change_stream_resumable?
      raise
    end

    # Rerun initial aggregation.
    # Any errors here will stop iteration and break out of this
    # method.

    # Save cursor's resume token so we can use it
    # to create a new cursor
    @resume_token = @cursor.resume_token

    close
    create_cursor!
    retry
  end

  # We need to verify each doc has an _id, so we
  # have a resume token to work with
  if doc && doc['_id'].nil?
    raise Error::MissingResumeToken
  end
  doc
end