Class: Mongo::Collection::View::MapReduce
Relationships & Source Files | |
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
self,
Forwardable
|
|
Instance Chain:
|
|
Inherits: | Object |
Defined in: | lib/mongo/collection/view/map_reduce.rb |
Overview
Provides behavior around a map/reduce operation on the collection view.
Constant Summary
-
INLINE =
The inline option.
'inline'.freeze
-
OUT_ACTIONS =
[ :replace, :merge, :reduce ].freeze
-
REROUTE =
Deprecated.
Reroute message.
'Rerouting the MapReduce operation to the primary server.'.freeze
::Mongo::Loggable
- Included
Class Method Summary
-
.new(view, map, reduce, options = {}) ⇒ MapReduce
constructor
Initialize the map/reduce for the provided collection view, functions and options.
Instance Attribute Summary
- #map_function ⇒ String readonly
- #reduce_function ⇒ String readonly
- #view ⇒ View readonly
- #inline? ⇒ Boolean readonly private
- #secondary_ok? ⇒ Boolean readonly private
Immutable
- Included
Instance Method Summary
-
#each {|Each| ... } ⇒ Enumerator
Iterate through documents returned by the map/reduce.
-
#execute ⇒ Mongo::Operation::Result
Execute the map reduce, without doing a fetch query to retrieve the results.
-
#finalize(function = nil) ⇒ MapReduce, String
Set or get the finalize function for the operation.
-
#js_mode(value = nil) ⇒ MapReduce, ...
Set or get the jsMode flag for the operation.
-
#out(location = nil) ⇒ MapReduce, Hash
Set or get the output location for the operation.
-
#out_collection_name
Returns the collection name where the map-reduce result is written to.
-
#out_database_name
Returns the database name where the map-reduce result is written to.
-
#scope(object = nil) ⇒ MapReduce, Hash
Set or get a scope on the operation.
-
#verbose(value = nil) ⇒ MapReduce, Hash
Whether to include the timing information in the result.
- #fetch_query_op(session) private
- #fetch_query_spec private
- #find_command_spec(session) private
- #initial_query_op(session) private
- #map_reduce_spec(session = nil) private
- #new(options) private
- #send_fetch_query(server, session) private
- #send_fetch_query_with_connection(connection, session) private
- #send_initial_query(server, context) private
- #send_initial_query_with_connection(connection, session, context:) private
- #server_selector private
- #valid_server?(description) ⇒ Boolean private
::Mongo::Retryable
- Included
#read_worker | Returns the read worker for handling retryable reads. |
#select_server | This is a separate method to make it possible for the test suite to assert that server selection is performed during retry attempts. |
#write_worker | Returns the write worker for handling retryable writes. |
::Mongo::Loggable
- Included
#log_debug | Convenience method to log debug messages with the standard prefix. |
#log_error | Convenience method to log error messages with the standard prefix. |
#log_fatal | Convenience method to log fatal messages with the standard prefix. |
#log_info | Convenience method to log info messages with the standard prefix. |
#log_warn | Convenience method to log warn messages with the standard prefix. |
#logger | Get the logger instance. |
#_mongo_log_prefix, #format_message |
Immutable
- Included
Constructor Details
.new(view, map, reduce, options = {}) ⇒ MapReduce
Initialize the map/reduce for the provided collection view, functions and options.
# File 'lib/mongo/collection/view/map_reduce.rb', line 121
def initialize(view, map, reduce, = {}) @view = view @map_function = map.dup.freeze @reduce_function = reduce.dup.freeze @options = BSON::Document.new( ).freeze client.log_warn('The map_reduce operation is deprecated, please use the aggregation pipeline instead') end
Instance Attribute Details
#inline? ⇒ Boolean
(readonly, private)
#map_function ⇒ String
(readonly)
# File 'lib/mongo/collection/view/map_reduce.rb', line 48
attr_reader :map_function
#reduce_function ⇒ String
(readonly)
# File 'lib/mongo/collection/view/map_reduce.rb', line 51
attr_reader :reduce_function
#secondary_ok? ⇒ Boolean
(readonly, private)
#view ⇒ View (readonly)
# File 'lib/mongo/collection/view/map_reduce.rb', line 45
attr_reader :view
Instance Method Details
#each {|Each| ... } ⇒ Enumerator
Iterate through documents returned by the map/reduce.
# File 'lib/mongo/collection/view/map_reduce.rb', line 71
def each @cursor = nil session = client.get_session(@options) server = cluster.next_primary(nil, session) context = Operation::Context.new(client: client, session: session, operation_timeouts: view.operation_timeouts) if server.load_balancer? # Connection will be checked in when cursor is drained. connection = server.pool.check_out(context: context) result = send_initial_query_with_connection(connection, context.session, context: context) result = send_fetch_query_with_connection(connection, session) unless inline? else result = send_initial_query(server, context) result = send_fetch_query(server, session) unless inline? end @cursor = Cursor.new(view, result, server, session: session) if block_given? @cursor.each do |doc| yield doc end else @cursor.to_enum end end
#execute ⇒ Mongo::Operation::Result
Execute the map reduce, without doing a fetch query to retrieve the results
if outputted to a collection.
# File 'lib/mongo/collection/view/map_reduce.rb', line 231
def execute view.send(:with_session, @options) do |session| write_concern = view.write_concern_with_session(session) context = Operation::Context.new(client: client, session: session) nro_write_with_retry(write_concern, context: context) do |connection, txn_num, context| send_initial_query_with_connection(connection, session, context: context) end end end
#fetch_query_op(session) (private)
# File 'lib/mongo/collection/view/map_reduce.rb', line 316
def fetch_query_op(session) spec = { coll_name: out_collection_name, db_name: out_database_name, filter: {}, session: session, read: read, read_concern: [:read_concern] || collection.read_concern, collation: [:collation] || view. [:collation], } Operation::Find.new(spec) end
#fetch_query_spec (private)
# File 'lib/mongo/collection/view/map_reduce.rb', line 308
def fetch_query_spec Builder::MapReduce.new(map_function, reduce_function, view, ).query_specification end
#finalize(function = nil) ⇒ MapReduce
, String
Set or get the finalize function for the operation.
# File 'lib/mongo/collection/view/map_reduce.rb', line 106
def finalize(function = nil) configure(:finalize, function) end
#find_command_spec(session) (private)
# File 'lib/mongo/collection/view/map_reduce.rb', line 312
def find_command_spec(session) Builder::MapReduce.new(map_function, reduce_function, view, .merge(session: session)).command_specification end
#initial_query_op(session) (private)
# File 'lib/mongo/collection/view/map_reduce.rb', line 261
def initial_query_op(session) spec = map_reduce_spec(session) # Read preference isn't simply passed in the command payload # (it may need to be converted to wire protocol flags). # Passing it in command payload produces errors on at least # 5.0 mongoses. # In the future map_reduce_command should remove :read # from its return value, however we cannot do this right now # due to Mongoid 7 relying on :read being returned as part of # the command - see RUBY-2932. # Delete :read here for now because it cannot be sent to mongos this way. spec = spec.dup spec[:selector] = spec[:selector].dup spec[:selector].delete(:read) Operation::MapReduce.new(spec) end
#js_mode(value = nil) ⇒ MapReduce
, ...
Set or get the jsMode flag for the operation.
# File 'lib/mongo/collection/view/map_reduce.rb', line 141
def js_mode(value = nil) configure(:js_mode, value) end
#map_reduce_spec(session = nil) (private)
# File 'lib/mongo/collection/view/map_reduce.rb', line 253
def map_reduce_spec(session = nil) Builder::MapReduce.new(map_function, reduce_function, view, .merge(session: session)).specification end
#new(options) (private)
# File 'lib/mongo/collection/view/map_reduce.rb', line 257
def new( ) MapReduce.new(view, map_function, reduce_function, ) end
#out(location = nil) ⇒ MapReduce
, Hash
Set or get the output location for the operation.
# File 'lib/mongo/collection/view/map_reduce.rb', line 165
def out(location = nil) configure(:out, location) end
#out_collection_name
Returns the collection name where the map-reduce result is written to. If the result is returned inline, returns nil.
#out_database_name
Returns the database name where the map-reduce result is written to. If the result is returned inline, returns nil.
#scope(object = nil) ⇒ MapReduce
, Hash
Set or get a scope on the operation.
# File 'lib/mongo/collection/view/map_reduce.rb', line 202
def scope(object = nil) configure(:scope, object) end
#send_fetch_query(server, session) (private)
# File 'lib/mongo/collection/view/map_reduce.rb', line 329
def send_fetch_query(server, session) fetch_query_op(session).execute(server, context: Operation::Context.new(client: client, session: session)) end
#send_fetch_query_with_connection(connection, session) (private)
# File 'lib/mongo/collection/view/map_reduce.rb', line 333
def send_fetch_query_with_connection(connection, session) fetch_query_op( session ).execute_with_connection( connection, context: Operation::Context.new(client: client, session: session) ) end
#send_initial_query(server, context) (private)
# File 'lib/mongo/collection/view/map_reduce.rb', line 290
def send_initial_query(server, context) server.with_connection do |connection| send_initial_query_with_connection(connection, context.session, context: context) end end
#send_initial_query_with_connection(connection, session, context:) (private)
# File 'lib/mongo/collection/view/map_reduce.rb', line 296
def send_initial_query_with_connection(connection, session, context:) op = initial_query_op(session) if valid_server?(connection.description) op.execute_with_connection(connection, context: context) else msg = "Rerouting the MapReduce operation to the primary server - #{connection.address} is not suitable because it is not currently the primray" log_warn(msg) server = cluster.next_primary(nil, session) op.execute(server, context: context) end end
#server_selector (private)
# File 'lib/mongo/collection/view/map_reduce.rb', line 245
def server_selector @view.send(:server_selector) end
#valid_server?(description) ⇒ Boolean
(private)
# File 'lib/mongo/collection/view/map_reduce.rb', line 278
def valid_server?(description) if secondary_ok? true else description.standalone? || description.mongos? || description.primary? || description.load_balancer? end end
#verbose(value = nil) ⇒ MapReduce
, Hash
Whether to include the timing information in the result.
# File 'lib/mongo/collection/view/map_reduce.rb', line 218
def verbose(value = nil) configure(:verbose, value) end