123456789_123456789_123456789_123456789_123456789_

Class: Mongo::Collection::View::Aggregation

Relationships & Source Files
Extension / Inclusion / Inheritance Descendants
Subclasses:
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
self, Forwardable
Instance Chain:
Inherits: Object
Defined in: lib/mongo/collection/view/aggregation.rb

Overview

Provides behavior around an aggregation pipeline on a collection view.

Since:

  • 2.0.0

Constant Summary

Explainable - Included

ALL_PLANS_EXECUTION, EXECUTION_STATS, QUERY_PLANNER

::Mongo::Loggable - Included

PREFIX

Class Method Summary

Instance Attribute Summary

Explainable - Included

Iterable - Included

#cursor

Returns the cursor associated with this view, if any.

#use_query_cache?

Immutable - Included

Instance Method Summary

::Mongo::Retryable - Included

#read_worker

Returns the read worker for handling retryable reads.

#select_server

This is a separate method to make it possible for the test suite to assert that server selection is performed during retry attempts.

#write_worker

Returns the write worker for handling retryable writes.

::Mongo::Loggable - Included

#log_debug

Convenience method to log debug messages with the standard prefix.

#log_error

Convenience method to log error messages with the standard prefix.

#log_fatal

Convenience method to log fatal messages with the standard prefix.

#log_info

Convenience method to log info messages with the standard prefix.

#log_warn

Convenience method to log warn messages with the standard prefix.

#logger

Get the logger instance.

#_mongo_log_prefix, #format_message

Explainable - Included

#explain

Get the query plan for the query.

#explain_options

Iterable - Included

#close_query

Cleans up resources associated with this query.

#each

Iterate through documents returned by a query with this View.

#kill_cursors
#cache_options, #cached_cursor, #initial_query_op,
#maybe_set_tailable_options

Add tailable cusror options to the command specifiction if needed.

#select_cursor, #send_initial_query

Immutable - Included

Constructor Details

.new(view, pipeline, options = {}) ⇒ Aggregation

Initialize the aggregation for the provided collection view, pipeline and options.

Examples:

Create the new aggregation view.

Aggregation.view.new(view, pipeline)

Parameters:

  • view (Collection::View)

    The collection view.

  • pipeline (Array<Hash>)

    The pipeline of operations.

  • options (Hash) (defaults to: {})

    The aggregation options.

Options Hash (options):

  • :allow_disk_use (true, false)

    Set to true if disk usage is allowed during the aggregation.

  • :batch_size (Integer)

    The number of documents to return per batch.

  • :bypass_document_validation (true, false)

    Whether or not to skip document level validation.

  • :collation (Hash)

    The collation to use.

  • :comment (Object)

    A user-provided comment to attach to this command.

  • :hint (String)

    The index to use for the aggregation.

  • :let (Hash)

    Mapping of variables to use in the pipeline. See the server documentation for details.

  • :max_time_ms (Integer)

    The maximum amount of time in milliseconds to allow the aggregation to run.

  • :use_cursor (true, false)

    Indicates whether the command will request that the server provide results using a cursor. Note that as of server version 3.6, aggregations always provide results using a cursor and this option is therefore not valid.

  • :session (Session)

    The session to use.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/aggregation.rb', line 97

def initialize(view, pipeline, options = {})
  @view = view
  @pipeline = pipeline.dup
  unless Mongo.broken_view_aggregate || view.filter.empty?
    @pipeline.unshift(:$match => view.filter)
  end
  @options = BSON::Document.new(options).freeze
end

Instance Attribute Details

#pipelineArray<Hash> (readonly)

Returns:

  • (Array<Hash>)

    pipeline The aggregation pipeline.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/aggregation.rb', line 37

attr_reader :pipeline

#viewView (readonly)

Returns:

  • (View)

    view The collection view.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/aggregation.rb', line 35

attr_reader :view

#write?Boolean (readonly)

This method is for internal use only.

Whether this aggregation will write its result to a database collection.

Returns:

  • (Boolean)

    Whether the aggregation will write its result to a collection.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/aggregation.rb', line 124

def write?
  pipeline.any? { |op| op.key?('$out') || op.key?(:$out) || op.key?('$merge') || op.key?(:$merge) }
end

Instance Method Details

#aggregate_spec(session, read_preference) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/aggregation.rb', line 134

def aggregate_spec(session, read_preference)
  Builder::Aggregation.new(
    pipeline,
    view,
    options.merge(session: session, read_preference: read_preference)
  ).specification
end

#allow_disk_use(value = nil) ⇒ true, ...

Set to true if disk usage is allowed during the aggregation.

Examples:

Set disk usage flag.

aggregation.allow_disk_use(true)

Parameters:

  • value (true, false) (defaults to: nil)

    The flag value.

Returns:

  • (true, false, Aggregation)

    The aggregation if a value was set or the value if used as a getter.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/aggregation.rb', line 62

def allow_disk_use(value = nil)
  configure(:allow_disk_use, value)
end

#cache_options (private)

Skip, sort, limit, projection are specified as pipeline stages rather than as options.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/aggregation.rb', line 197

def cache_options
  {
    namespace: collection.namespace,
    selector: pipeline,
    read_concern: view.read_concern,
    read_preference: view.read_preference,
    collation: options[:collation],
    # Aggregations can read documents from more than one collection,
    # so they will be cleared on every write operation.
    multi_collection: true,
  }
end

#effective_read_preference(connection) ⇒ Hash | nil (private)

Return effective read preference for the operation.

If the pipeline contains $merge or $out, and read preference specified by user is secondary or secondary_preferred, and target server is below 5.0, than this method returns primary read preference, because the aggregation will be routed to primary. Otherwise return the original read preference.

See github.com/mongodb/specifications/blob/master/source/crud/crud.rst#read-preferences-and-server-selection

Parameters:

  • connection (Server::Connection)

    The connection which will be used for the operation.

Returns:

  • (Hash | nil)

    read preference hash that should be sent with this command.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/aggregation.rb', line 164

def effective_read_preference(connection)
  return unless view.read_preference
  return view.read_preference unless write?
  return view.read_preference unless [:secondary, :secondary_preferred].include?(view.read_preference[:mode])

  primary_read_preference = {mode: :primary}
  description = connection.description
  if description.primary?
    log_warn("Routing the Aggregation operation to the primary server")
    primary_read_preference
  elsif description.mongos? && !description.features.merge_out_on_secondary_enabled?
    log_warn("Routing the Aggregation operation to the primary server")
    primary_read_preference
  else
    view.read_preference
  end

end

#explainHash

Get the explain plan for the aggregation.

Examples:

Get the explain plan for the aggregation.

aggregation.explain

Returns:

  • (Hash)

    The explain plan.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/aggregation.rb', line 114

def explain
  self.class.new(view, pipeline, options.merge(explain: true)).first
end

#initial_query_op(session, read_preference) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/aggregation.rb', line 146

def initial_query_op(session, read_preference)
  Operation::Aggregate.new(aggregate_spec(session, read_preference))
end

#new(options) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/aggregation.rb', line 142

def new(options)
  Aggregation.new(view, pipeline, options)
end

#send_initial_query(server, session) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/aggregation.rb', line 183

def send_initial_query(server, session)
  server.with_connection do |connection|
    initial_query_op(
      session,
      effective_read_preference(connection)
    ).execute_with_connection(
      connection,
      context: Operation::Context.new(client: client, session: session)
    )
  end
end

#server_selector (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/aggregation.rb', line 130

def server_selector
  @view.send(:server_selector)
end