123456789_123456789_123456789_123456789_123456789_

Aggregation Pipeline

Mongoid exposes MongoDB's aggregation pipeline, which is used to construct flows of operations that process and return results. The aggregation pipeline is a superset of the deprecated map/reduce framework <map-reduce> functionality.

Basic Usage

Querying Across Multiple Collections [aggregation-pipeline-example-multiple-collections]

The aggregation pipeline may be used for queries involving multiple referenced associations at the same time:

class Band
  include Mongoid::Document
  has_many :tours
  has_many :awards
  field :name, type: String
end

class Tour
  include Mongoid::Document
  belongs_to :band
  field :year, type: Integer
end

class Award
  include Mongoid::Document
  belongs_to :band
  field :name, type: String
end

To retrieve bands that toured since 2000 and have at least one award, one could do the following:

band_ids = Band.collection.aggregate([
  { '$lookup' => {
    from: 'tours',
    localField: '_id',
    foreignField: 'band_id',
    as: 'tours',
  } },
  { '$lookup' => {
    from: 'awards',
    localField: '_id',
    foreignField: 'band_id',
    as: 'awards',
  } },
  { '$match' => {
    'tours.year' => {'$gte' => 2000},
    'awards._id' => {'$exists' => true},
  } },
  {'$project' => {_id: 1}},
])
bands = Band.find(band_ids.to_a)

Note that the aggregation pipeline, since it is implemented by the Ruby driver for MongoDB and not Mongoid, returns raw BSON::Document objects rather than ::Mongoid::Document model instances. The above example projects only the _id field which is then used to load full models. An alternative is to not perform such a projection and work with raw fields, which would eliminate having to send the list of document ids to Mongoid in the second query (which could be large).

Builder DSL [aggregation-pipeline-builder-dsl]

Mongoid provides limited support for constructing the aggregation pipeline itself using a high-level DSL. The following aggregation pipeline operators are supported:

To construct a pipeline, call the corresponding aggregation pipeline methods on a Criteria instance. Aggregation pipeline operations are added to the pipeline attribute of the Criteria instance. To execute the pipeline, pass the pipeline attribute value to Collection#aggragegate method.

For example, given the following models:

class Tour
  include Mongoid::Document

  embeds_many :participants

  field :name, type: String
  field :states, type: Array
end

class Participant
  include Mongoid::Document

  embedded_in :tour

  field :name, type: String
end

We can find out which states a participant visited:

criteria = Tour.where('participants.name' => 'Serenity',).
  unwind(:states).
  group(_id: 'states', :states.add_to_set => '$states').
  project(_id: 0, states: 1)

pp criteria.pipeline
# => [{"$match"=>{"participants.name"=>"Serenity"}},
#     {"$unwind"=>"$states"},
#     {"$group"=>{"_id"=>"states", "states"=>{"$addToSet"=>"$states"}}},
#     {"$project"=>{"_id"=>0, "states"=>1}}]

Tour.collection.aggregate(criteria.pipeline).to_a

group

The group method adds a \$group aggregation pipeline stage.

The field expressions support Mongoid symbol-operator syntax:

criteria = Tour.all.group(_id: 'states', :states.add_to_set => '$states')
criteria.pipeline
# => [{"$group"=>{"_id"=>"states", "states"=>{"$addToSet"=>"$states"}}}]

Alternatively, standard MongoDB aggregation pipeline syntax may be used:

criteria = Tour.all.group(_id: 'states', states: {'$addToSet' => '$states'})

project

The project method adds a \$project aggregation pipeline stage.

The argument should be a Hash specifying the projection:

criteria = Tour.all.project(_id: 0, states: 1)
criteria.pipeline
# => [{"$project"=>{"_id"=>0, "states"=>1}}]

unwind [unwind-dsl]

The unwind method adds an \$unwind aggregation pipeline stage.

The argument can be a field name, specifiable as a symbol or a string, or a Hash or a BSON::Document instance:

criteria = Tour.all.unwind(:states)
criteria = Tour.all.unwind('states')
criteria.pipeline
# => [{"$unwind"=>"$states"}]

criteria = Tour.all.unwind(path: '$states')
criteria.pipeline
# => [{"$unwind"=>{:path=>"$states"}}]