Class: Mongo::BulkWrite
| Relationships & Source Files | |
| Namespace Children | |
|
Modules:
| |
|
Classes:
| |
| Super Chains via Extension / Inclusion / Inheritance | |
|
Class Chain:
self,
Forwardable
|
|
|
Instance Chain:
|
|
| Inherits: | Object |
| Defined in: | lib/mongo/bulk_write.rb, lib/mongo/bulk_write/combineable.rb, lib/mongo/bulk_write/ordered_combiner.rb, lib/mongo/bulk_write/result.rb, lib/mongo/bulk_write/result_combiner.rb, lib/mongo/bulk_write/transformable.rb, lib/mongo/bulk_write/unordered_combiner.rb, lib/mongo/bulk_write/validatable.rb |
Constant Summary
-
SINGLE_STATEMENT_OPS =
# File 'lib/mongo/bulk_write.rb', line 172%i[delete_one update_one insert_one].freeze
Class Method Summary
-
.new(collection, requests, options = {}) ⇒ BulkWrite
constructor
Internal use only
Internal use only
Create the new bulk write operation.
Instance Attribute Summary
- #collection ⇒ Mongo::Collection readonly
- #options ⇒ Hash, BSON::Document readonly
-
#ordered? ⇒ true, false
readonly
Internal use only
Internal use only
Is the bulk write ordered?
- #requests ⇒ Array<Hash, BSON::Document> readonly
Instance Method Summary
-
#execute ⇒ Mongo::BulkWrite::Result
Execute the bulk write operation.
-
#write_concern(session = nil) ⇒ WriteConcern
Internal use only
Internal use only
Get the write concern for the bulk write.
- #base_spec(operation_id, session) private
- #calculate_deadline ⇒ Float | nil private
-
#can_hint?(connection) ⇒ true | false
private
Loop through the requests and check if each operation is allowed to send a hint for each operation on the given server version.
- #delete_many(documents, connection, context, operation_id, session, _txn_num) private
- #delete_one(documents, connection, context, operation_id, session, txn_num) private
- #execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num = nil) private
- #insert_one(documents, connection, context, operation_id, session, txn_num) private
-
#maybe_first(obj) ⇒ Object
private
If the given object is an array return the first element, otherwise return the given object.
- #op_combiner private
- #op_timeout_ms(deadline) ⇒ Integer | nil private
-
#operation_timeouts(deadline)
private
Returns the operation_timeouts hash for creating an
Operation::Context. -
#replace_one(documents, connection, context, operation_id, session, txn_num)
private
Alias for #update_one.
- #single_statement?(operation) ⇒ Boolean private
- #split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) private
- #update_many(documents, connection, context, operation_id, session, _txn_num) private
- #update_one(documents, connection, context, operation_id, session, txn_num) (also: #replace_one) private
- #validate_hint!(connection) private
-
#validate_requests!
private
Perform the request document validation required by driver specifications.
Operation::ResponseHandling - Included
| #add_error_labels | Adds error labels to exceptions raised in the yielded to block, which should perform MongoDB operations and raise Mongo::Errors on failure. |
| #add_server_diagnostics | Yields to the block and, if the block raises an exception, adds a note to the exception with the address of the specified server. |
| #maybe_add_retryable_write_error_label! | A method that will add the RetryableWriteError label to an error if any of the following conditions are true: |
| #unpin_maybe | Unpins the session and/or the connection if the yielded to block raises errors that are required to unpin the session and the connection. |
| #validate_result | |
Instance Attribute Details
#collection ⇒ Mongo::Collection (readonly)
# File 'lib/mongo/bulk_write.rb', line 31
attr_reader :collection
#options ⇒ Hash, BSON::Document (readonly)
# File 'lib/mongo/bulk_write.rb', line 37
attr_reader :
#ordered? ⇒ true, false (readonly)
Is the bulk write ordered?
# File 'lib/mongo/bulk_write.rb', line 148
def ordered? @ordered ||= .fetch(:ordered, true) end
#requests ⇒ Array<Hash, BSON::Document> (readonly)
# File 'lib/mongo/bulk_write.rb', line 34
attr_reader :requests
Instance Method Details
#base_spec(operation_id, session) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 221
def base_spec(operation_id, session) { db_name: database.name, coll_name: collection.name, write_concern: write_concern(session), ordered: ordered?, operation_id: operation_id, bypass_document_validation: !![:bypass_document_validation], max_time_ms: [:max_time_ms], options: , id_generator: client.[:id_generator], session: session, comment: [:comment], let: [:let], } end
#calculate_deadline ⇒ Float | nil (private)
# File 'lib/mongo/bulk_write.rb', line 177
def calculate_deadline timeout_ms = @options[:timeout_ms] || collection.timeout_ms return nil if timeout_ms.nil? if timeout_ms == 0 0 else Utils.monotonic_time + (timeout_ms / 1_000.0) end end
#can_hint?(connection) ⇒ true | false (private)
Loop through the requests and check if each operation is allowed to send a hint for each operation on the given server version.
For the following operations, the client can send a hint for all supported server versions, and for the rest, the client can only send it for 4.4+:
- updateOne
- updateMany
- replaceOne
# File 'lib/mongo/bulk_write.rb', line 333
def can_hint?(connection) gte_4_4 = connection.server.description.server_version_gte?('4.4') op_combiner.requests.all? do |req| op = req.keys.first if req[op].keys.include?(:hint) %i[update_one update_many replace_one].include?(op) || gte_4_4 else true end end end
#delete_many(documents, connection, context, operation_id, session, _txn_num) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 284
def delete_many(documents, connection, context, operation_id, session, _txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(deletes: documents) Operation::Delete.new(spec).bulk_execute(connection, context: context) end
#delete_one(documents, connection, context, operation_id, session, txn_num) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 277
def delete_one(documents, connection, context, operation_id, session, txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(deletes: documents, txn_num: txn_num) Operation::Delete.new(spec).bulk_execute(connection, context: context) end
#execute ⇒ Mongo::BulkWrite::Result
Execute the bulk write operation.
# File 'lib/mongo/bulk_write.rb', line 57
def execute operation_id = Monitoring.next_operation_id result_combiner = ResultCombiner.new operations = op_combiner.combine validate_requests! deadline = calculate_deadline client.with_session(@options) do |session| operations.each do |operation| context = Operation::Context.new( client: client, session: session, operation_timeouts: operation_timeouts(deadline) ) if single_statement?(operation) write_concern = write_concern(session) write_with_retry(write_concern, context: context) do |connection, txn_num, context| execute_operation( operation.keys.first, operation.values.flatten, connection, context, operation_id, result_combiner, session, txn_num ) end else nro_write_with_retry(write_concern, context: context) do |connection, _txn_num, context| execute_operation( operation.keys.first, operation.values.flatten, connection, context, operation_id, result_combiner, session ) end end end end result_combiner.result end
#execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num = nil) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 238
def execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num = nil) validate_hint!(connection) unpin_maybe(session, connection) do if values.size > connection.description.max_write_batch_size split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) else result = send(name, values, connection, context, operation_id, session, txn_num) add_server_diagnostics(connection) do add_error_labels(connection, context) do result_combiner.combine!(result, values.size) end end end end # The size of each section in the message # is independently capped at 16m and each bulk operation becomes # its own section. The size of the entire bulk write is limited to 48m. rescue Error::MaxBSONSize, Error::MaxMessageSize => e raise e if values.size <= 1 unpin_maybe(session, connection) do split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) end end
#insert_one(documents, connection, context, operation_id, session, txn_num) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 291
def insert_one(documents, connection, context, operation_id, session, txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(documents: documents, txn_num: txn_num) Operation::Insert.new(spec).bulk_execute(connection, context: context) end
#maybe_first(obj) ⇒ Object (private)
If the given object is an array return the first element, otherwise return the given object.
# File 'lib/mongo/bulk_write.rb', line 391
def maybe_first(obj) obj.is_a?(Array) ? obj.first : obj end
#op_combiner (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 265
def op_combiner @op_combiner ||= ordered? ? OrderedCombiner.new(requests) : UnorderedCombiner.new(requests) end
#op_timeout_ms(deadline) ⇒ Integer | nil (private)
# File 'lib/mongo/bulk_write.rb', line 207
def op_timeout_ms(deadline) return nil if deadline.nil? if deadline == 0 0 else ((deadline - Utils.monotonic_time) * 1_000).to_i end end
#operation_timeouts(deadline) (private)
Returns the operation_timeouts hash for creating an Operation::Context. Uses operation_timeout_ms when the timeout was explicitly set on the bulk write, or inherited_timeout_ms when it comes from the collection or client. This distinction is important inside transactions where operation_timeout_ms is not allowed (RUBY-3685).
# File 'lib/mongo/bulk_write.rb', line 193
def operation_timeouts(deadline) timeout = op_timeout_ms(deadline) return {} if timeout.nil? if @options[:timeout_ms] { operation_timeout_ms: timeout } else { inherited_timeout_ms: timeout } end end
#replace_one(documents, connection, context, operation_id, session, txn_num) (private)
Alias for #update_one.
# File 'lib/mongo/bulk_write.rb', line 304
alias replace_one update_one
#single_statement?(operation) ⇒ Boolean (private)
# File 'lib/mongo/bulk_write.rb', line 217
def single_statement?(operation) SINGLE_STATEMENT_OPS.include?(operation.keys.first) end
#split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 269
def split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) execute_operation(name, values.shift(values.size / 2), connection, context, operation_id, result_combiner, session, txn_num) txn_num = session.next_txn_num if txn_num && !session.in_transaction? execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num) end
#update_many(documents, connection, context, operation_id, session, _txn_num) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 306
def update_many(documents, connection, context, operation_id, session, _txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(updates: documents) Operation::Update.new(spec).bulk_execute(connection, context: context) end
#update_one(documents, connection, context, operation_id, session, txn_num) (private) Also known as: #replace_one
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 298
def update_one(documents, connection, context, operation_id, session, txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(updates: documents, txn_num: txn_num) Operation::Update.new(spec).bulk_execute(connection, context: context) end
#validate_hint!(connection) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 313
def validate_hint!(connection) return unless op_combiner.has_hint? return unless !can_hint?(connection) && write_concern && !write_concern.acknowledged? raise Error::UnsupportedOption.hint_error(unacknowledged_write: true) end
#validate_requests! (private)
Perform the request document validation required by driver specifications. This method validates the first key of each update request document to be an operator (i.e. start with $) and the first key of each replacement document to not be an operator (i.e. not start with $). The request document may be invalid without this method flagging it as such (for example an update or replacement document containing some keys which are operators and some which are not), in which case the driver expects the server to fail the operation with an error.
Raise an ArgumentError if requests is empty.
# File 'lib/mongo/bulk_write.rb', line 359
def validate_requests! requests_empty = true @requests.each do |req| requests_empty = false if op = req.keys.first if %i[update_one update_many].include?(op) if (doc = maybe_first(req.dig(op, :update))) && (key = doc.keys&.first) && !key.to_s.start_with?('$') raise Error::InvalidUpdateDocument.new(key: key) if Mongo.validate_update_replace Error::InvalidUpdateDocument.warn(Logger.logger, key) end elsif op == :replace_one if (key = req.dig(op, :replacement)&.keys&.first) && key.to_s.start_with?('$') raise Error::InvalidReplacementDocument.new(key: key) if Mongo.validate_update_replace Error::InvalidReplacementDocument.warn(Logger.logger, key) end end end end.tap do raise ArgumentError, 'Bulk write requests cannot be empty' if requests_empty end end
#write_concern(session = nil) ⇒ WriteConcern
Get the write concern for the bulk write.
# File 'lib/mongo/bulk_write.rb', line 162
def write_concern(session = nil) @write_concern ||= if [:write_concern] WriteConcern.get([:write_concern]) else collection.write_concern_with_session(session) end end