Class: Mongo::BulkWrite
Relationships & Source Files | |
Namespace Children | |
Modules:
| |
Classes:
| |
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
self,
Forwardable
|
|
Instance Chain:
|
|
Inherits: | Object |
Defined in: | lib/mongo/bulk_write.rb, lib/mongo/bulk_write/combineable.rb, lib/mongo/bulk_write/ordered_combiner.rb, lib/mongo/bulk_write/result.rb, lib/mongo/bulk_write/result_combiner.rb, lib/mongo/bulk_write/transformable.rb, lib/mongo/bulk_write/unordered_combiner.rb, lib/mongo/bulk_write/validatable.rb |
Constant Summary
-
SINGLE_STATEMENT_OPS =
# File 'lib/mongo/bulk_write.rb', line 169[ :delete_one, :update_one, :insert_one ].freeze
Class Method Summary
-
.new(collection, requests, options = {}) ⇒ BulkWrite
constructor
Internal use only
Internal use only
Create the new bulk write operation.
Instance Attribute Summary
- #collection ⇒ Mongo::Collection readonly
- #options ⇒ Hash, BSON::Document readonly
-
#ordered? ⇒ true, false
readonly
Internal use only
Internal use only
Is the bulk write ordered?
- #requests ⇒ Array<Hash, BSON::Document> readonly
Instance Method Summary
-
#execute ⇒ Mongo::BulkWrite::Result
Execute the bulk write operation.
-
#write_concern(session = nil) ⇒ WriteConcern
Internal use only
Internal use only
Get the write concern for the bulk write.
- #base_spec(operation_id, session) private
- #calculate_deadline ⇒ Float | nil private
-
#can_hint?(connection) ⇒ true | false
private
Loop through the requests and check if each operation is allowed to send a hint for each operation on the given server version.
- #delete_many(documents, connection, context, operation_id, session, txn_num) private
- #delete_one(documents, connection, context, operation_id, session, txn_num) private
- #execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num = nil) private
- #insert_one(documents, connection, context, operation_id, session, txn_num) private
-
#maybe_first(obj) ⇒ Object
private
If the given object is an array return the first element, otherwise return the given object.
- #op_combiner private
- #op_timeout_ms(deadline) ⇒ Integer | nil private
-
#replace_one(documents, connection, context, operation_id, session, txn_num)
private
Alias for #update_one.
- #single_statement?(operation) ⇒ Boolean private
- #split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) private
- #update_many(documents, connection, context, operation_id, session, txn_num) private
- #update_one(documents, connection, context, operation_id, session, txn_num) (also: #replace_one) private
- #validate_array_filters!(connection) private
- #validate_collation!(connection) private
- #validate_hint!(connection) private
-
#validate_requests!
private
Perform the request document validation required by driver specifications.
Operation::ResponseHandling
- Included
#add_error_labels | Adds error labels to exceptions raised in the yielded to block, which should perform MongoDB operations and raise Mongo::Errors on failure. |
#add_server_diagnostics | Yields to the block and, if the block raises an exception, adds a note to the exception with the address of the specified server. |
#maybe_add_retryable_write_error_label! | A method that will add the RetryableWriteError label to an error if any of the following conditions are true: |
#unpin_maybe | Unpins the session and/or the connection if the yielded to block raises errors that are required to unpin the session and the connection. |
#validate_result |
Instance Attribute Details
#collection ⇒ Mongo::Collection (readonly)
# File 'lib/mongo/bulk_write.rb', line 32
attr_reader :collection
#options ⇒ Hash
, BSON::Document
(readonly)
# File 'lib/mongo/bulk_write.rb', line 38
attr_reader :
#ordered? ⇒ true
, false
(readonly)
Is the bulk write ordered?
# File 'lib/mongo/bulk_write.rb', line 147
def ordered? @ordered ||= .fetch(:ordered, true) end
#requests ⇒ Array
<Hash
, BSON::Document
> (readonly)
# File 'lib/mongo/bulk_write.rb', line 35
attr_reader :requests
Instance Method Details
#base_spec(operation_id, session) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 202
def base_spec(operation_id, session) { :db_name => database.name, :coll_name => collection.name, :write_concern => write_concern(session), :ordered => ordered?, :operation_id => operation_id, :bypass_document_validation => !! [:bypass_document_validation], :max_time_ms => [:max_time_ms], : => , :id_generator => client. [:id_generator], :session => session, :comment => [:comment], :let => [:let], } end
#calculate_deadline ⇒ Float
| nil
(private)
# File 'lib/mongo/bulk_write.rb', line 174
def calculate_deadline timeout_ms = @options[:timeout_ms] || collection.timeout_ms return nil if timeout_ms.nil? if timeout_ms == 0 0 else Utils.monotonic_time + (timeout_ms / 1_000.0) end end
#can_hint?(connection) ⇒ true
| false
(private)
Loop through the requests and check if each operation is allowed to send a hint for each operation on the given server version.
For the following operations, the client can send a hint for servers >= 4.2 and for the rest, the client can only send it for 4.4+:
- updateOne
- updateMany
- replaceOne
# File 'lib/mongo/bulk_write.rb', line 335
def can_hint?(connection) gte_4_2 = connection.server.description.server_version_gte?('4.2') gte_4_4 = connection.server.description.server_version_gte?('4.4') op_combiner.requests.all? do |req| op = req.keys.first if req[op].keys.include?(:hint) if [:update_one, :update_many, :replace_one].include?(op) gte_4_2 else gte_4_4 end else true end end end
#delete_many(documents, connection, context, operation_id, session, txn_num) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 269
def delete_many(documents, connection, context, operation_id, session, txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(:deletes => documents) Operation::Delete.new(spec).bulk_execute(connection, context: context) end
#delete_one(documents, connection, context, operation_id, session, txn_num) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 262
def delete_one(documents, connection, context, operation_id, session, txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(:deletes => documents, :txn_num => txn_num) Operation::Delete.new(spec).bulk_execute(connection, context: context) end
#execute ⇒ Mongo::BulkWrite::Result
Execute the bulk write operation.
# File 'lib/mongo/bulk_write.rb', line 58
def execute operation_id = Monitoring.next_operation_id result_combiner = ResultCombiner.new operations = op_combiner.combine validate_requests! deadline = calculate_deadline client.with_session(@options) do |session| operations.each do |operation| context = Operation::Context.new( client: client, session: session, operation_timeouts: { operation_timeout_ms: op_timeout_ms(deadline) } ) if single_statement?(operation) write_concern = write_concern(session) write_with_retry(write_concern, context: context) do |connection, txn_num, context| execute_operation( operation.keys.first, operation.values.flatten, connection, context, operation_id, result_combiner, session, txn_num) end else nro_write_with_retry(write_concern, context: context) do |connection, txn_num, context| execute_operation( operation.keys.first, operation.values.flatten, connection, context, operation_id, result_combiner, session) end end end end result_combiner.result end
#execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num = nil) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 219
def execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num = nil) validate_collation!(connection) validate_array_filters!(connection) validate_hint!(connection) unpin_maybe(session, connection) do if values.size > connection.description.max_write_batch_size split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) else result = send(name, values, connection, context, operation_id, session, txn_num) add_server_diagnostics(connection) do add_error_labels(connection, context) do result_combiner.combine!(result, values.size) end end end end # With OP_MSG (3.6+ servers), the size of each section in the message # is independently capped at 16m and each bulk operation becomes # its own section. The size of the entire bulk write is limited to 48m. # With OP_QUERY (pre-3.6 servers), the entire bulk write is sent as a # single document and is thus subject to the 16m document size limit. # This means the splits differ between pre-3.6 and 3.6+ servers, with # 3.6+ servers being able to split less. rescue Error::MaxBSONSize, Error::MaxMessageSize => e raise e if values.size <= 1 unpin_maybe(session, connection) do split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) end end
#insert_one(documents, connection, context, operation_id, session, txn_num) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 276
def insert_one(documents, connection, context, operation_id, session, txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(:documents => documents, :txn_num => txn_num) Operation::Insert.new(spec).bulk_execute(connection, context: context) end
#maybe_first(obj) ⇒ Object
(private)
If the given object is an array return the first element, otherwise return the given object.
# File 'lib/mongo/bulk_write.rb', line 406
def maybe_first(obj) obj.is_a?(Array) ? obj.first : obj end
#op_combiner (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 251
def op_combiner @op_combiner ||= ordered? ? OrderedCombiner.new(requests) : UnorderedCombiner.new(requests) end
#op_timeout_ms(deadline) ⇒ Integer
| nil
(private)
# File 'lib/mongo/bulk_write.rb', line 188
def op_timeout_ms(deadline) return nil if deadline.nil? if deadline == 0 0 else ((deadline - Utils.monotonic_time) * 1_000).to_i end end
#replace_one(documents, connection, context, operation_id, session, txn_num) (private)
Alias for #update_one.
# File 'lib/mongo/bulk_write.rb', line 289
alias :replace_one :update_one
#single_statement?(operation) ⇒ Boolean
(private)
# File 'lib/mongo/bulk_write.rb', line 198
def single_statement?(operation) SINGLE_STATEMENT_OPS.include?(operation.keys.first) end
#split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 255
def split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) execute_operation(name, values.shift(values.size / 2), connection, context, operation_id, result_combiner, session, txn_num) txn_num = session.next_txn_num if txn_num && !session.in_transaction? execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num) end
#update_many(documents, connection, context, operation_id, session, txn_num) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 291
def update_many(documents, connection, context, operation_id, session, txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(:updates => documents) Operation::Update.new(spec).bulk_execute(connection, context: context) end
#update_one(documents, connection, context, operation_id, session, txn_num) (private) Also known as: #replace_one
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 283
def update_one(documents, connection, context, operation_id, session, txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(:updates => documents, :txn_num => txn_num) Operation::Update.new(spec).bulk_execute(connection, context: context) end
#validate_array_filters!(connection) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 306
def validate_array_filters!(connection) if op_combiner.has_array_filters? && !connection.features.array_filters_enabled? raise Error::UnsupportedArrayFilters.new end end
#validate_collation!(connection) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 300
def validate_collation!(connection) if op_combiner.has_collation? && !connection.features.collation_enabled? raise Error::UnsupportedCollation.new end end
#validate_hint!(connection) (private)
[ GitHub ]# File 'lib/mongo/bulk_write.rb', line 312
def validate_hint!(connection) if op_combiner.has_hint? if !can_hint?(connection) && write_concern && !write_concern.acknowledged? raise Error::UnsupportedOption.hint_error(unacknowledged_write: true) elsif !connection.features.update_delete_option_validation_enabled? raise Error::UnsupportedOption.hint_error end end end
#validate_requests! (private)
Perform the request document validation required by driver specifications. This method validates the first key of each update request document to be an operator (i.e. start with $) and the first key of each replacement document to not be an operator (i.e. not start with $). The request document may be invalid without this method flagging it as such (for example an update or replacement document containing some keys which are operators and some which are not), in which case the driver expects the server to fail the operation with an error.
Raise an ArgumentError if requests is empty.
# File 'lib/mongo/bulk_write.rb', line 366
def validate_requests! requests_empty = true @requests.each do |req| requests_empty = false if op = req.keys.first if [:update_one, :update_many].include?(op) if doc = maybe_first(req.dig(op, :update)) if key = doc.keys&.first unless key.to_s.start_with?("$") if Mongo.validate_update_replace raise Error::InvalidUpdateDocument.new(key: key) else Error::InvalidUpdateDocument.warn(Logger.logger, key) end end end end elsif op == :replace_one if key = req.dig(op, :replacement)&.keys&.first if key.to_s.start_with?("$") if Mongo.validate_update_replace raise Error::InvalidReplacementDocument.new(key: key) else Error::InvalidReplacementDocument.warn(Logger.logger, key) end end end end end end.tap do raise ArgumentError, "Bulk write requests cannot be empty" if requests_empty end end
#write_concern(session = nil) ⇒ WriteConcern
Get the write concern for the bulk write.
# File 'lib/mongo/bulk_write.rb', line 161
def write_concern(session = nil) @write_concern ||= [:write_concern] ? WriteConcern.get( [:write_concern]) : collection.write_concern_with_session(session) end