123456789_123456789_123456789_123456789_123456789_

Class: Mongo::Collection::View::Aggregation

Relationships & Source Files
Namespace Children
Modules:
Extension / Inclusion / Inheritance Descendants
Subclasses:
Super Chains via Extension / Inclusion / Inheritance
Instance Chain:
Inherits: Object
Defined in: lib/mongo/collection/view/aggregation.rb,
lib/mongo/collection/view/aggregation/behavior.rb

Overview

Provides behavior around an aggregation pipeline on a collection view.

Since:

  • 2.0.0

Constant Summary

Explainable - Included

ALL_PLANS_EXECUTION, EXECUTION_STATS, QUERY_PLANNER

::Mongo::Loggable - Included

PREFIX

Class Method Summary

Instance Attribute Summary

Behavior - Included

#view,
#write?

Whether this aggregation will write its result to a database collection.

Explainable - Included

Iterable - Included

#prefer_cached_cursor?

If the caching cursor is closed and was not fully iterated, the documents we have in it are not the complete result set and we have no way of completing that iteration.

#use_query_cache?

::Mongo::CursorHost - Included

#cursor

Returns the cursor associated with this view, if any.

#timeout_mode

Immutable - Included

Instance Method Summary

Behavior - Included

#allow_disk_use

Set to true if disk usage is allowed during the aggregation.

#explain

Get the explain plan for the aggregation.

#timeout_ms, #aggregate_spec,
#cache_options

Skip, sort, limit, projection are specified as pipeline stages rather than as options.

#operation_timeouts,
#perform_setup

Common setup for all classes that include this behavior; the constructor should invoke this method.

#server_selector

::Mongo::Retryable - Included

#read_worker

Returns the read worker for handling retryable reads.

#select_server

This is a separate method to make it possible for the test suite to assert that server selection is performed during retry attempts.

#write_worker

Returns the write worker for handling retryable writes.

::Mongo::Loggable - Included

#log_debug

Convenience method to log debug messages with the standard prefix.

#log_error

Convenience method to log error messages with the standard prefix.

#log_fatal

Convenience method to log fatal messages with the standard prefix.

#log_info

Convenience method to log info messages with the standard prefix.

#log_warn

Convenience method to log warn messages with the standard prefix.

#logger

Get the logger instance.

#_mongo_log_prefix, #format_message

Explainable - Included

#explain

Get the query plan for the query.

#explain_options

Iterable - Included

#close_query

Cleans up resources associated with this query.

#each

Iterate through documents returned by a query with this ::Mongo::Collection::View.

#kill_cursors
#cache_options, #cached_cursor, #compute_limit_for_cached_query, #initial_query_op,
#maybe_set_tailable_options

Add tailable cusror options to the command specifiction if needed.

#new_cursor_for_iteration

Start a new cursor for use when iterating (via #each).

#oplog_replay, #select_cursor, #send_initial_query

::Mongo::CursorHost - Included

#validate_timeout_mode!

Ensure the timeout mode is appropriate for other options that have been given.

Immutable - Included

Constructor Details

.new(view, pipeline, options = {}) ⇒ Aggregation

Initialize the aggregation for the provided collection view, pipeline and options.

Examples:

Create the new aggregation view.

Aggregation.view.new(view, pipeline)

Parameters:

  • view (Collection::View)

    The collection view.

  • pipeline (Array<Hash>)

    The pipeline of operations.

  • options (Hash) (defaults to: {})

    The aggregation options.

Options Hash (options):

  • :allow_disk_use (true, false)

    Set to true if disk usage is allowed during the aggregation.

  • :batch_size (Integer)

    The number of documents to return per batch.

  • :bypass_document_validation (true, false)

    Whether or not to skip document level validation.

  • :collation (Hash)

    The collation to use.

  • :comment (Object)

    A user-provided comment to attach to this command.

  • :hint (String)

    The index to use for the aggregation.

  • :let (Hash)

    Mapping of variables to use in the pipeline. See the server documentation for details.

  • :max_time_ms (Integer)

    The maximum amount of time in milliseconds to allow the aggregation to run. This option is deprecated, use :timeout_ms instead.

  • :session (Session)

    The session to use.

  • :timeout_mode (:cursor_lifetime | :iteration)

    How to interpret :timeout_ms (whether it applies to the lifetime of the cursor, or per iteration).

  • :timeout_ms (Integer)

    The operation timeout in milliseconds. Must be a non-negative integer. An explicit value of 0 means infinite. The default value is unset which means the value is inherited from the collection or the database or the client.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/aggregation.rb', line 68

def initialize(view, pipeline, options = {})
  perform_setup(view, options) do
    @pipeline = pipeline.dup
    unless Mongo.broken_view_aggregate || view.filter.empty?
      @pipeline.unshift(:$match => view.filter)
    end
  end
end

Instance Attribute Details

#pipelineArray<Hash> (readonly)

Returns:

  • (Array<Hash>)

    pipeline The aggregation pipeline.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/aggregation.rb', line 31

attr_reader :pipeline

Instance Method Details

#effective_read_preference(connection) ⇒ Hash | nil (private)

Return effective read preference for the operation.

If the pipeline contains $merge or $out, and read preference specified by user is secondary or secondary_preferred, and target server is below 5.0, than this method returns primary read preference, because the aggregation will be routed to primary. Otherwise return the original read preference.

See github.com/mongodb/specifications/blob/master/source/crud/crud.md#read-preferences-and-server-selection

Parameters:

  • connection (Server::Connection)

    The connection which will be used for the operation.

Returns:

  • (Hash | nil)

    read preference hash that should be sent with this command.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/aggregation.rb', line 101

def effective_read_preference(connection)
  return unless view.read_preference
  return view.read_preference unless write?
  return view.read_preference unless [:secondary, :secondary_preferred].include?(view.read_preference[:mode])

  primary_read_preference = {mode: :primary}
  description = connection.description
  if description.primary?
    log_warn("Routing the Aggregation operation to the primary server")
    primary_read_preference
  elsif description.mongos? && !description.features.merge_out_on_secondary_enabled?
    log_warn("Routing the Aggregation operation to the primary server")
    primary_read_preference
  else
    view.read_preference
  end

end

#initial_query_op(session, read_preference) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/aggregation.rb', line 83

def initial_query_op(session, read_preference)
  Operation::Aggregate.new(aggregate_spec(session, read_preference))
end

#new(options) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/aggregation.rb', line 79

def new(options)
  Aggregation.new(view, pipeline, options)
end

#send_initial_query(server, context) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/aggregation.rb', line 120

def send_initial_query(server, context)
  if server.load_balancer?
    # Connection will be checked in when cursor is drained.
    connection = server.pool.check_out(context: context)
    initial_query_op(
      context.session,
      effective_read_preference(connection)
    ).execute_with_connection(
      connection,
      context: context
    )
  else
    server.with_connection do |connection|
      initial_query_op(
        context.session,
        effective_read_preference(connection)
      ).execute_with_connection(
        connection,
        context: context
      )
    end
  end
end