123456789_123456789_123456789_123456789_123456789_

Class: Mongo::Collection::View::MapReduce

Relationships & Source Files
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
self, Forwardable
Instance Chain:
Inherits: Object
Defined in: lib/mongo/collection/view/map_reduce.rb

Overview

Provides behavior around a map/reduce operation on the collection view.

Since:

  • 2.0.0

Constant Summary

::Mongo::Loggable - Included

PREFIX

Class Method Summary

Instance Attribute Summary

Instance Method Summary

::Mongo::Retryable - Included

#read_worker

Returns the read worker for handling retryable reads.

#select_server

This is a separate method to make it possible for the test suite to assert that server selection is performed during retry attempts.

#write_worker

Returns the write worker for handling retryable writes.

::Mongo::Loggable - Included

#log_debug

Convenience method to log debug messages with the standard prefix.

#log_error

Convenience method to log error messages with the standard prefix.

#log_fatal

Convenience method to log fatal messages with the standard prefix.

#log_info

Convenience method to log info messages with the standard prefix.

#log_warn

Convenience method to log warn messages with the standard prefix.

#logger

Get the logger instance.

#_mongo_log_prefix, #format_message

Immutable - Included

Constructor Details

.new(view, map, reduce, options = {}) ⇒ MapReduce

Initialize the map/reduce for the provided collection view, functions and options.

Examples:

Create the new map/reduce view.

Parameters:

  • view (Collection::View)

    The collection view.

  • map (String)

    The map function.

  • reduce (String)

    The reduce function.

  • options (Hash) (defaults to: {})

    The map/reduce options.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 121

def initialize(view, map, reduce, options = {})
  @view = view
  @map_function = map.dup.freeze
  @reduce_function = reduce.dup.freeze
  @options = BSON::Document.new(options).freeze

  client.log_warn('The map_reduce operation is deprecated, please use the aggregation pipeline instead')
end

Instance Attribute Details

#inline?Boolean (readonly, private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 249

def inline?
  out.nil? || out == { inline: 1 } || out == { INLINE => 1 }
end

#map_functionString (readonly)

Returns:

  • (String)

    map The map function.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 48

attr_reader :map_function

#reduce_functionString (readonly)

Returns:

  • (String)

    reduce The reduce function.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 51

attr_reader :reduce_function

#secondary_ok?Boolean (readonly, private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 286

def secondary_ok?
  out.respond_to?(:keys) && out.keys.first.to_s.downcase == INLINE
end

#viewView (readonly)

Returns:

  • (View)

    view The collection view.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 45

attr_reader :view

Instance Method Details

#each {|Each| ... } ⇒ Enumerator

Iterate through documents returned by the map/reduce.

Examples:

Iterate through the result of the map/reduce.

map_reduce.each do |document|
  p document
end

Yield Parameters:

  • Each (Hash)

    matching document.

Returns:

  • (Enumerator)

    The enumerator.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 71

def each
  @cursor = nil
  session = client.get_session(@options)
  server = cluster.next_primary(nil, session)
  context = Operation::Context.new(client: client, session: session, operation_timeouts: view.operation_timeouts)
  if server.load_balancer?
    # Connection will be checked in when cursor is drained.
    connection = server.pool.check_out(context: context)
    result = send_initial_query_with_connection(connection, context.session, context: context)
    result = send_fetch_query_with_connection(connection, session) unless inline?
  else
    result = send_initial_query(server, context)
    result = send_fetch_query(server, session) unless inline?
  end
  @cursor = Cursor.new(view, result, server, session: session)
  if block_given?
    @cursor.each do |doc|
      yield doc
    end
  else
    @cursor.to_enum
  end
end

#executeMongo::Operation::Result

Execute the map reduce, without doing a fetch query to retrieve the results

if outputted to a collection.

Examples:

Execute the map reduce and get the raw result.

map_reduce.execute

Returns:

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 231

def execute
  view.send(:with_session, @options) do |session|
    write_concern = view.write_concern_with_session(session)
    context = Operation::Context.new(client: client, session: session)
    nro_write_with_retry(write_concern, context: context) do |connection, txn_num, context|
      send_initial_query_with_connection(connection, session, context: context)
    end
  end
end

#fetch_query_op(session) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 316

def fetch_query_op(session)
  spec = {
    coll_name: out_collection_name,
    db_name: out_database_name,
    filter: {},
    session: session,
    read: read,
    read_concern: options[:read_concern] || collection.read_concern,
    collation: options[:collation] || view.options[:collation],
  }
  Operation::Find.new(spec)
end

#fetch_query_spec (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 308

def fetch_query_spec
  Builder::MapReduce.new(map_function, reduce_function, view, options).query_specification
end

#finalize(function = nil) ⇒ MapReduce, String

Set or get the finalize function for the operation.

Examples:

Set the finalize function.

map_reduce.finalize(function)

Parameters:

  • function (String) (defaults to: nil)

    The finalize js function.

Returns:

  • (MapReduce, String)

    The new MapReduce operation or the value of the function.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 106

def finalize(function = nil)
  configure(:finalize, function)
end

#find_command_spec(session) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 312

def find_command_spec(session)
  Builder::MapReduce.new(map_function, reduce_function, view, options.merge(session: session)).command_specification
end

#initial_query_op(session) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 261

def initial_query_op(session)
  spec = map_reduce_spec(session)
  # Read preference isn't simply passed in the command payload
  # (it may need to be converted to wire protocol flags).
  # Passing it in command payload produces errors on at least
  # 5.0 mongoses.
  # In the future map_reduce_command should remove :read
  # from its return value, however we cannot do this right now
  # due to Mongoid 7 relying on :read being returned as part of
  # the command - see RUBY-2932.
  # Delete :read here for now because it cannot be sent to mongos this way.
  spec = spec.dup
  spec[:selector] = spec[:selector].dup
  spec[:selector].delete(:read)
  Operation::MapReduce.new(spec)
end

#js_mode(value = nil) ⇒ MapReduce, ...

Set or get the jsMode flag for the operation.

Examples:

Set js mode for the operation.

map_reduce.js_mode(true)

Parameters:

  • value (true, false) (defaults to: nil)

    The jsMode value.

Returns:

  • (MapReduce, true, false)

    The new MapReduce operation or the value of the jsMode flag.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 141

def js_mode(value = nil)
  configure(:js_mode, value)
end

#map_reduce_spec(session = nil) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 253

def map_reduce_spec(session = nil)
  Builder::MapReduce.new(map_function, reduce_function, view, options.merge(session: session)).specification
end

#new(options) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 257

def new(options)
  MapReduce.new(view, map_function, reduce_function, options)
end

#out(location = nil) ⇒ MapReduce, Hash

Set or get the output location for the operation.

Examples:

Set the output to inline.

map_reduce.out(inline: 1)

Set the output collection to merge.

map_reduce.out(merge: 'users')

Set the output collection to replace.

map_reduce.out(replace: 'users')

Set the output collection to reduce.

map_reduce.out(reduce: 'users')

Parameters:

  • location (Hash) (defaults to: nil)

    The output location details.

Returns:

  • (MapReduce, Hash)

    The new MapReduce operation or the value of the output location.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 165

def out(location = nil)
  configure(:out, location)
end

#out_collection_name

Returns the collection name where the map-reduce result is written to. If the result is returned inline, returns nil.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 171

def out_collection_name
  if options[:out].respond_to?(:keys)
    options[:out][OUT_ACTIONS.find do |action|
      options[:out][action]
    end]
  end || options[:out]
end

#out_database_name

Returns the database name where the map-reduce result is written to. If the result is returned inline, returns nil.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 181

def out_database_name
  if options[:out]
    if options[:out].respond_to?(:keys) && (db = options[:out][:db])
      db
    else
      database.name
    end
  end
end

#scope(object = nil) ⇒ MapReduce, Hash

Set or get a scope on the operation.

Examples:

Set the scope value.

map_reduce.scope(value: 'test')

Parameters:

  • object (Hash) (defaults to: nil)

    The scope object.

Returns:

  • (MapReduce, Hash)

    The new MapReduce operation or the value of the scope.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 202

def scope(object = nil)
  configure(:scope, object)
end

#send_fetch_query(server, session) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 329

def send_fetch_query(server, session)
  fetch_query_op(session).execute(server, context: Operation::Context.new(client: client, session: session))
end

#send_fetch_query_with_connection(connection, session) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 333

def send_fetch_query_with_connection(connection, session)
  fetch_query_op(
    session
  ).execute_with_connection(
    connection,
    context: Operation::Context.new(client: client, session: session)
  )
end

#send_initial_query(server, context) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 290

def send_initial_query(server, context)
  server.with_connection do |connection|
    send_initial_query_with_connection(connection, context.session, context: context)
  end
end

#send_initial_query_with_connection(connection, session, context:) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 296

def send_initial_query_with_connection(connection, session, context:)
  op = initial_query_op(session)
  if valid_server?(connection.description)
    op.execute_with_connection(connection, context: context)
  else
    msg = "Rerouting the MapReduce operation to the primary server - #{connection.address} is not suitable because it is not currently the primray"
    log_warn(msg)
    server = cluster.next_primary(nil, session)
    op.execute(server, context: context)
  end
end

#server_selector (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 245

def server_selector
  @view.send(:server_selector)
end

#valid_server?(description) ⇒ Boolean (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 278

def valid_server?(description)
  if secondary_ok?
    true
  else
    description.standalone? || description.mongos? || description.primary? || description.load_balancer?
  end
end

#verbose(value = nil) ⇒ MapReduce, Hash

Whether to include the timing information in the result.

Examples:

Set the verbose value.

map_reduce.verbose(false)

Parameters:

  • value (true, false) (defaults to: nil)

    Whether to include timing information in the result.

Returns:

  • (MapReduce, Hash)

    The new MapReduce operation or the value of the verbose option.

Since:

  • 2.0.5

[ GitHub ]

  
# File 'lib/mongo/collection/view/map_reduce.rb', line 218

def verbose(value = nil)
  configure(:verbose, value)
end