Class: Mongo::Collection::View::Aggregation
Relationships & Source Files | |
Extension / Inclusion / Inheritance Descendants | |
Subclasses:
|
|
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
self,
Forwardable
|
|
Instance Chain:
|
|
Inherits: | Object |
Defined in: | lib/mongo/collection/view/aggregation.rb |
Overview
Provides behavior around an aggregation pipeline on a collection view.
Constant Summary
-
REROUTE =
Deprecated.
The reroute message.
'Rerouting the Aggregation operation to the primary server.'.freeze
Explainable
- Included
ALL_PLANS_EXECUTION, EXECUTION_STATS, QUERY_PLANNER
::Mongo::Loggable
- Included
Class Method Summary
-
.new(view, pipeline, options = {}) ⇒ Aggregation
constructor
Initialize the aggregation for the provided collection view, pipeline and options.
Instance Attribute Summary
- #pipeline ⇒ Array<Hash> readonly
- #view ⇒ View readonly
-
#write? ⇒ Boolean
readonly
Internal use only
Internal use only
Whether this aggregation will write its result to a database collection.
Explainable
- Included
Iterable
- Included
#cursor | Returns the cursor associated with this view, if any. |
#use_query_cache? |
Immutable
- Included
Instance Method Summary
-
#allow_disk_use(value = nil) ⇒ true, ...
Set to true if disk usage is allowed during the aggregation.
-
#explain ⇒ Hash
Get the explain plan for the aggregation.
- #aggregate_spec(session, read_preference) private
-
#cache_options
private
Skip, sort, limit, projection are specified as pipeline stages rather than as options.
-
#effective_read_preference(connection) ⇒ Hash | nil
private
Return effective read preference for the operation.
- #initial_query_op(session, read_preference) private
- #new(options) private
- #send_initial_query(server, session) private
- #server_selector private
::Mongo::Retryable
- Included
#read_worker | Returns the read worker for handling retryable reads. |
#select_server | This is a separate method to make it possible for the test suite to assert that server selection is performed during retry attempts. |
#write_worker | Returns the write worker for handling retryable writes. |
::Mongo::Loggable
- Included
#log_debug | Convenience method to log debug messages with the standard prefix. |
#log_error | Convenience method to log error messages with the standard prefix. |
#log_fatal | Convenience method to log fatal messages with the standard prefix. |
#log_info | Convenience method to log info messages with the standard prefix. |
#log_warn | Convenience method to log warn messages with the standard prefix. |
#logger | Get the logger instance. |
#_mongo_log_prefix, #format_message |
Explainable
- Included
#explain | Get the query plan for the query. |
#explain_options |
Iterable
- Included
#close_query | Cleans up resources associated with this query. |
#each | Iterate through documents returned by a query with this |
#kill_cursors | Alias for Iterable#close_query. |
#cache_options, #cached_cursor, #initial_query_op, | |
#maybe_set_tailable_options | Add tailable cusror options to the command specifiction if needed. |
#select_cursor, #send_initial_query |
Immutable
- Included
Constructor Details
.new(view, pipeline, options = {}) ⇒ Aggregation
Initialize the aggregation for the provided collection view, pipeline and options.
Instance Attribute Details
#pipeline ⇒ Array
<Hash
> (readonly)
# File 'lib/mongo/collection/view/aggregation.rb', line 37
attr_reader :pipeline
#view ⇒ View (readonly)
# File 'lib/mongo/collection/view/aggregation.rb', line 35
attr_reader :view
#write? ⇒ Boolean
(readonly)
Whether this aggregation will write its result to a database collection.
# File 'lib/mongo/collection/view/aggregation.rb', line 124
def write? pipeline.any? { |op| op.key?('$out') || op.key?(:$out) || op.key?('$merge') || op.key?(:$merge) } end
Instance Method Details
#aggregate_spec(session, read_preference) (private)
# File 'lib/mongo/collection/view/aggregation.rb', line 134
def aggregate_spec(session, read_preference) Builder::Aggregation.new( pipeline, view, .merge(session: session, read_preference: read_preference) ).specification end
#allow_disk_use(value = nil) ⇒ true
, ...
Set to true if disk usage is allowed during the aggregation.
# File 'lib/mongo/collection/view/aggregation.rb', line 62
def allow_disk_use(value = nil) configure(:allow_disk_use, value) end
#cache_options (private)
Skip, sort, limit, projection are specified as pipeline stages rather than as options.
# File 'lib/mongo/collection/view/aggregation.rb', line 197
def { namespace: collection.namespace, selector: pipeline, read_concern: view.read_concern, read_preference: view.read_preference, collation: [:collation], # Aggregations can read documents from more than one collection, # so they will be cleared on every write operation. multi_collection: true, } end
#effective_read_preference(connection) ⇒ Hash
| nil
(private)
Return effective read preference for the operation.
If the pipeline contains $merge or $out, and read preference specified by user is secondary or secondary_preferred, and target server is below 5.0, than this method returns primary read preference, because the aggregation will be routed to primary. Otherwise return the original read preference.
# File 'lib/mongo/collection/view/aggregation.rb', line 164
def effective_read_preference(connection) return unless view.read_preference return view.read_preference unless write? return view.read_preference unless [:secondary, :secondary_preferred].include?(view.read_preference[:mode]) primary_read_preference = {mode: :primary} description = connection.description if description.primary? log_warn("Routing the Aggregation operation to the primary server") primary_read_preference elsif description.mongos? && !description.features.merge_out_on_secondary_enabled? log_warn("Routing the Aggregation operation to the primary server") primary_read_preference else view.read_preference end end
#explain ⇒ Hash
Get the explain plan for the aggregation.
#initial_query_op(session, read_preference) (private)
# File 'lib/mongo/collection/view/aggregation.rb', line 146
def initial_query_op(session, read_preference) Operation::Aggregate.new(aggregate_spec(session, read_preference)) end
#new(options) (private)
#send_initial_query(server, session) (private)
# File 'lib/mongo/collection/view/aggregation.rb', line 183
def send_initial_query(server, session) server.with_connection do |connection| initial_query_op( session, effective_read_preference(connection) ).execute_with_connection( connection, context: Operation::Context.new(client: client, session: session) ) end end
#server_selector (private)
# File 'lib/mongo/collection/view/aggregation.rb', line 130
def server_selector @view.send(:server_selector) end