Class: Mongo::BulkWrite
| Relationships & Source Files | |
| Namespace Children | |
|
Modules:
| |
|
Classes:
| |
| Super Chains via Extension / Inclusion / Inheritance | |
|
Class Chain:
self,
Forwardable
|
|
|
Instance Chain:
|
|
| Inherits: | Object |
| Defined in: | lib/mongo/bulk_write.rb, lib/mongo/bulk_write/combineable.rb, lib/mongo/bulk_write/ordered_combiner.rb, lib/mongo/bulk_write/result.rb, lib/mongo/bulk_write/result_combiner.rb, lib/mongo/bulk_write/transformable.rb, lib/mongo/bulk_write/unordered_combiner.rb, lib/mongo/bulk_write/validatable.rb |
Constant Summary
-
SINGLE_STATEMENT_OPS =
# File 'lib/mongo/bulk_write.rb', line 169[ :delete_one, :update_one, :insert_one ].freeze
Class Method Summary
-
.new(collection, requests, options = {}) ⇒ BulkWrite
constructor
Internal use only
Internal use only
Create the new bulk write operation.
Instance Attribute Summary
- #collection ⇒ Mongo::Collection readonly
- #options ⇒ Hash, BSON::Document readonly
-
#ordered? ⇒ true, false
readonly
Internal use only
Internal use only
Is the bulk write ordered?
- #requests ⇒ Array<Hash, BSON::Document> readonly
Instance Method Summary
-
#execute ⇒ Mongo::BulkWrite::Result
Execute the bulk write operation.
-
#write_concern(session = nil) ⇒ WriteConcern
Internal use only
Internal use only
Get the write concern for the bulk write.
- #base_spec(operation_id, session) private
- #calculate_deadline ⇒ Float | nil private
-
#can_hint?(connection) ⇒ true | false
private
Loop through the requests and check if each operation is allowed to send a hint for each operation on the given server version.
- #delete_many(documents, connection, context, operation_id, session, txn_num) private
- #delete_one(documents, connection, context, operation_id, session, txn_num) private
- #execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num = nil) private
- #insert_one(documents, connection, context, operation_id, session, txn_num) private
-
#maybe_first(obj) ⇒ Object
private
If the given object is an array return the first element, otherwise return the given object.
- #op_combiner private
- #op_timeout_ms(deadline) ⇒ Integer | nil private
-
#operation_timeouts(deadline)
private
Returns the operation_timeouts hash for creating an
Operation::Context. -
#replace_one(documents, connection, context, operation_id, session, txn_num)
private
Alias for #update_one.
- #single_statement?(operation) ⇒ Boolean private
- #split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) private
- #update_many(documents, connection, context, operation_id, session, txn_num) private
- #update_one(documents, connection, context, operation_id, session, txn_num) (also: #replace_one) private
- #validate_hint!(connection) private
-
#validate_requests!
private
Perform the request document validation required by driver specifications.
Operation::ResponseHandling - Included
| #add_error_labels | Adds error labels to exceptions raised in the yielded to block, which should perform MongoDB operations and raise Mongo::Errors on failure. |
| #add_server_diagnostics | Yields to the block and, if the block raises an exception, adds a note to the exception with the address of the specified server. |
| #maybe_add_retryable_write_error_label! | A method that will add the RetryableWriteError label to an error if any of the following conditions are true: |
| #unpin_maybe | Unpins the session and/or the connection if the yielded to block raises errors that are required to unpin the session and the connection. |
| #validate_result | |
Instance Attribute Details
#collection ⇒ Mongo::Collection (readonly)
# File 'lib/mongo/bulk_write.rb', line 32
attr_reader :collection
#options ⇒ Hash, BSON::Document (readonly)
# File 'lib/mongo/bulk_write.rb', line 38
attr_reader :
#ordered? ⇒ true, false (readonly)
Is the bulk write ordered?
# File 'lib/mongo/bulk_write.rb', line 147
def ordered? @ordered ||= .fetch(:ordered, true) end
#requests ⇒ Array<Hash, BSON::Document> (readonly)
# File 'lib/mongo/bulk_write.rb', line 35
attr_reader :requests
Instance Method Details
#base_spec(operation_id, session) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 218
def base_spec(operation_id, session) { :db_name => database.name, :coll_name => collection.name, :write_concern => write_concern(session), :ordered => ordered?, :operation_id => operation_id, :bypass_document_validation => !![:bypass_document_validation], :max_time_ms => [:max_time_ms], : => , :id_generator => client.[:id_generator], :session => session, :comment => [:comment], :let => [:let], } end
#calculate_deadline ⇒ Float | nil (private)
# File 'lib/mongo/bulk_write.rb', line 174
def calculate_deadline timeout_ms = @options[:timeout_ms] || collection.timeout_ms return nil if timeout_ms.nil? if timeout_ms == 0 0 else Utils.monotonic_time + (timeout_ms / 1_000.0) end end
#can_hint?(connection) ⇒ true | false (private)
Loop through the requests and check if each operation is allowed to send a hint for each operation on the given server version.
For the following operations, the client can send a hint for all supported server versions, and for the rest, the client can only send it for 4.4+:
- updateOne
- updateMany
- replaceOne
# File 'lib/mongo/bulk_write.rb', line 331
def can_hint?(connection) gte_4_4 = connection.server.description.server_version_gte?('4.4') op_combiner.requests.all? do |req| op = req.keys.first if req[op].keys.include?(:hint) if [:update_one, :update_many, :replace_one].include?(op) true else gte_4_4 end else true end end end
#delete_many(documents, connection, context, operation_id, session, txn_num) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 279
def delete_many(documents, connection, context, operation_id, session, txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(:deletes => documents) Operation::Delete.new(spec).bulk_execute(connection, context: context) end
#delete_one(documents, connection, context, operation_id, session, txn_num) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 272
def delete_one(documents, connection, context, operation_id, session, txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(:deletes => documents, :txn_num => txn_num) Operation::Delete.new(spec).bulk_execute(connection, context: context) end
#execute ⇒ Mongo::BulkWrite::Result
Execute the bulk write operation.
# File 'lib/mongo/bulk_write.rb', line 58
def execute operation_id = Monitoring.next_operation_id result_combiner = ResultCombiner.new operations = op_combiner.combine validate_requests! deadline = calculate_deadline client.with_session(@options) do |session| operations.each do |operation| context = Operation::Context.new( client: client, session: session, operation_timeouts: operation_timeouts(deadline) ) if single_statement?(operation) write_concern = write_concern(session) write_with_retry(write_concern, context: context) do |connection, txn_num, context| execute_operation( operation.keys.first, operation.values.flatten, connection, context, operation_id, result_combiner, session, txn_num) end else nro_write_with_retry(write_concern, context: context) do |connection, txn_num, context| execute_operation( operation.keys.first, operation.values.flatten, connection, context, operation_id, result_combiner, session) end end end end result_combiner.result end
#execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num = nil) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 235
def execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num = nil) validate_hint!(connection) unpin_maybe(session, connection) do if values.size > connection.description.max_write_batch_size split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) else result = send(name, values, connection, context, operation_id, session, txn_num) add_server_diagnostics(connection) do add_error_labels(connection, context) do result_combiner.combine!(result, values.size) end end end end # The size of each section in the message # is independently capped at 16m and each bulk operation becomes # its own section. The size of the entire bulk write is limited to 48m. rescue Error::MaxBSONSize, Error::MaxMessageSize => e raise e if values.size <= 1 unpin_maybe(session, connection) do split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) end end
#insert_one(documents, connection, context, operation_id, session, txn_num) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 286
def insert_one(documents, connection, context, operation_id, session, txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(:documents => documents, :txn_num => txn_num) Operation::Insert.new(spec).bulk_execute(connection, context: context) end
#maybe_first(obj) ⇒ Object (private)
If the given object is an array return the first element, otherwise return the given object.
# File 'lib/mongo/bulk_write.rb', line 401
def maybe_first(obj) obj.is_a?(Array) ? obj.first : obj end
#op_combiner (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 261
def op_combiner @op_combiner ||= ordered? ? OrderedCombiner.new(requests) : UnorderedCombiner.new(requests) end
#op_timeout_ms(deadline) ⇒ Integer | nil (private)
# File 'lib/mongo/bulk_write.rb', line 204
def op_timeout_ms(deadline) return nil if deadline.nil? if deadline == 0 0 else ((deadline - Utils.monotonic_time) * 1_000).to_i end end
#operation_timeouts(deadline) (private)
Returns the operation_timeouts hash for creating an Operation::Context. Uses operation_timeout_ms when the timeout was explicitly set on the bulk write, or inherited_timeout_ms when it comes from the collection or client. This distinction is important inside transactions where operation_timeout_ms is not allowed (RUBY-3685).
# File 'lib/mongo/bulk_write.rb', line 190
def operation_timeouts(deadline) timeout = op_timeout_ms(deadline) return {} if timeout.nil? if @options[:timeout_ms] { operation_timeout_ms: timeout } else { inherited_timeout_ms: timeout } end end
#replace_one(documents, connection, context, operation_id, session, txn_num) (private)
Alias for #update_one.
# File 'lib/mongo/bulk_write.rb', line 299
alias :replace_one :update_one
#single_statement?(operation) ⇒ Boolean (private)
# File 'lib/mongo/bulk_write.rb', line 214
def single_statement?(operation) SINGLE_STATEMENT_OPS.include?(operation.keys.first) end
#split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 265
def split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) execute_operation(name, values.shift(values.size / 2), connection, context, operation_id, result_combiner, session, txn_num) txn_num = session.next_txn_num if txn_num && !session.in_transaction? execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num) end
#update_many(documents, connection, context, operation_id, session, txn_num) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 301
def update_many(documents, connection, context, operation_id, session, txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(:updates => documents) Operation::Update.new(spec).bulk_execute(connection, context: context) end
#update_one(documents, connection, context, operation_id, session, txn_num) (private) Also known as: #replace_one
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 293
def update_one(documents, connection, context, operation_id, session, txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(:updates => documents, :txn_num => txn_num) Operation::Update.new(spec).bulk_execute(connection, context: context) end
#validate_hint!(connection) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 310
def validate_hint!(connection) if op_combiner.has_hint? if !can_hint?(connection) && write_concern && !write_concern.acknowledged? raise Error::UnsupportedOption.hint_error(unacknowledged_write: true) end end end
#validate_requests! (private)
Perform the request document validation required by driver specifications. This method validates the first key of each update request document to be an operator (i.e. start with $) and the first key of each replacement document to not be an operator (i.e. not start with $). The request document may be invalid without this method flagging it as such (for example an update or replacement document containing some keys which are operators and some which are not), in which case the driver expects the server to fail the operation with an error.
Raise an ArgumentError if requests is empty.
# File 'lib/mongo/bulk_write.rb', line 361
def validate_requests! requests_empty = true @requests.each do |req| requests_empty = false if op = req.keys.first if [:update_one, :update_many].include?(op) if doc = maybe_first(req.dig(op, :update)) if key = doc.keys&.first unless key.to_s.start_with?("$") if Mongo.validate_update_replace raise Error::InvalidUpdateDocument.new(key: key) else Error::InvalidUpdateDocument.warn(Logger.logger, key) end end end end elsif op == :replace_one if key = req.dig(op, :replacement)&.keys&.first if key.to_s.start_with?("$") if Mongo.validate_update_replace raise Error::InvalidReplacementDocument.new(key: key) else Error::InvalidReplacementDocument.warn(Logger.logger, key) end end end end end end.tap do raise ArgumentError, "Bulk write requests cannot be empty" if requests_empty end end
#write_concern(session = nil) ⇒ WriteConcern
Get the write concern for the bulk write.
# File 'lib/mongo/bulk_write.rb', line 161
def write_concern(session = nil) @write_concern ||= [:write_concern] ? WriteConcern.get([:write_concern]) : collection.write_concern_with_session(session) end