123456789_123456789_123456789_123456789_123456789_

Class: Mongo::BulkWrite

Constant Summary

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Operation::ResponseHandling - Included

#add_error_labels

Adds error labels to exceptions raised in the yielded to block, which should perform MongoDB operations and raise Mongo::Errors on failure.

#add_server_diagnostics

Yields to the block and, if the block raises an exception, adds a note to the exception with the address of the specified server.

#maybe_add_retryable_write_error_label!

A method that will add the RetryableWriteError label to an error if any of the following conditions are true:

#unpin_maybe

Unpins the session and/or the connection if the yielded to block raises errors that are required to unpin the session and the connection.

#validate_result

Instance Attribute Details

#collectionMongo::Collection (readonly)

Returns:

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 31

attr_reader :collection

#optionsHash, BSON::Document (readonly)

Returns:

  • (Hash, BSON::Document)

    options The options.

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 37

attr_reader :options

#ordered?true, false (readonly)

This method is for internal use only.

Is the bulk write ordered?

Examples:

Is the bulk write ordered?

bulk_write.ordered?

Returns:

  • (true, false)

    If the bulk write is ordered.

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 148

def ordered?
  @ordered ||= options.fetch(:ordered, true)
end

#requestsArray<Hash, BSON::Document> (readonly)

Returns:

  • (Array<Hash, BSON::Document>)

    requests The requests.

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 34

attr_reader :requests

Instance Method Details

#base_spec(operation_id, session) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 221

def base_spec(operation_id, session)
  {
    db_name: database.name,
    coll_name: collection.name,
    write_concern: write_concern(session),
    ordered: ordered?,
    operation_id: operation_id,
    bypass_document_validation: !!options[:bypass_document_validation],
    max_time_ms: options[:max_time_ms],
    options: options,
    id_generator: client.options[:id_generator],
    session: session,
    comment: options[:comment],
    let: options[:let],
  }
end

#calculate_deadlineFloat | nil (private)

Returns:

  • (Float | nil)

    Deadline for the batch of operations, if set.

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 177

def calculate_deadline
  timeout_ms = @options[:timeout_ms] || collection.timeout_ms
  return nil if timeout_ms.nil?

  if timeout_ms == 0
    0
  else
    Utils.monotonic_time + (timeout_ms / 1_000.0)
  end
end

#can_hint?(connection) ⇒ true | false (private)

Loop through the requests and check if each operation is allowed to send a hint for each operation on the given server version.

For the following operations, the client can send a hint for all supported server versions, and for the rest, the client can only send it for 4.4+:

- updateOne
- updateMany
- replaceOne

Parameters:

  • connection (Connection)

    The connection object.

Returns:

  • (true | false)

    Whether the request is able to send hints for the current server version.

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 333

def can_hint?(connection)
  gte_4_4 = connection.server.description.server_version_gte?('4.4')
  op_combiner.requests.all? do |req|
    op = req.keys.first
    if req[op].keys.include?(:hint)
      %i[update_one update_many replace_one].include?(op) || gte_4_4
    else
      true
    end
  end
end

#delete_many(documents, connection, context, operation_id, session, _txn_num) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 284

def delete_many(documents, connection, context, operation_id, session, _txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(deletes: documents)
  Operation::Delete.new(spec).bulk_execute(connection, context: context)
end

#delete_one(documents, connection, context, operation_id, session, txn_num) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 277

def delete_one(documents, connection, context, operation_id, session, txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(deletes: documents, txn_num: txn_num)
  Operation::Delete.new(spec).bulk_execute(connection, context: context)
end

#executeMongo::BulkWrite::Result

Execute the bulk write operation.

Examples:

Execute the bulk write.

bulk_write.execute

Returns:

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 57

def execute
  operation_id = Monitoring.next_operation_id
  result_combiner = ResultCombiner.new
  operations = op_combiner.combine
  validate_requests!
  deadline = calculate_deadline

  client.with_session(@options) do |session|
    operations.each do |operation|
      context = Operation::Context.new(
        client: client,
        session: session,
        operation_timeouts: operation_timeouts(deadline)
      )
      if single_statement?(operation)
        write_concern = write_concern(session)
        write_with_retry(write_concern, context: context) do |connection, txn_num, context|
          execute_operation(
            operation.keys.first,
            operation.values.flatten,
            connection,
            context,
            operation_id,
            result_combiner,
            session,
            txn_num
          )
        end
      else
        nro_write_with_retry(write_concern, context: context) do |connection, _txn_num, context|
          execute_operation(
            operation.keys.first,
            operation.values.flatten,
            connection,
            context,
            operation_id,
            result_combiner,
            session
          )
        end
      end
    end
  end
  result_combiner.result
end

#execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num = nil) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 238

def execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num = nil)
  validate_hint!(connection)

  unpin_maybe(session, connection) do
    if values.size > connection.description.max_write_batch_size
      split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num)
    else
      result = send(name, values, connection, context, operation_id, session, txn_num)

      add_server_diagnostics(connection) do
        add_error_labels(connection, context) do
          result_combiner.combine!(result, values.size)
        end
      end
    end
  end
# The size of each section in the message
# is independently capped at 16m and each bulk operation becomes
# its own section. The size of the entire bulk write is limited to 48m.
rescue Error::MaxBSONSize, Error::MaxMessageSize => e
  raise e if values.size <= 1

  unpin_maybe(session, connection) do
    split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num)
  end
end

#insert_one(documents, connection, context, operation_id, session, txn_num) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 291

def insert_one(documents, connection, context, operation_id, session, txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(documents: documents, txn_num: txn_num)
  Operation::Insert.new(spec).bulk_execute(connection, context: context)
end

#maybe_first(obj) ⇒ Object (private)

If the given object is an array return the first element, otherwise return the given object.

Parameters:

  • obj (Object)

    The given object.

Returns:

  • (Object)

    The first element of the array or the given object.

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 391

def maybe_first(obj)
  obj.is_a?(Array) ? obj.first : obj
end

#op_combiner (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 265

def op_combiner
  @op_combiner ||= ordered? ? OrderedCombiner.new(requests) : UnorderedCombiner.new(requests)
end

#op_timeout_ms(deadline) ⇒ Integer | nil (private)

Parameters:

  • deadline (Float | nil)

    Deadline for the batch of operations.

Returns:

  • (Integer | nil)

    Timeout in milliseconds for the next operation.

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 207

def op_timeout_ms(deadline)
  return nil if deadline.nil?

  if deadline == 0
    0
  else
    ((deadline - Utils.monotonic_time) * 1_000).to_i
  end
end

#operation_timeouts(deadline) (private)

Returns the operation_timeouts hash for creating an Operation::Context. Uses operation_timeout_ms when the timeout was explicitly set on the bulk write, or inherited_timeout_ms when it comes from the collection or client. This distinction is important inside transactions where operation_timeout_ms is not allowed (RUBY-3685).

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 193

def operation_timeouts(deadline)
  timeout = op_timeout_ms(deadline)
  return {} if timeout.nil?

  if @options[:timeout_ms]
    { operation_timeout_ms: timeout }
  else
    { inherited_timeout_ms: timeout }
  end
end

#replace_one(documents, connection, context, operation_id, session, txn_num) (private)

Alias for #update_one.

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 304

alias replace_one update_one

#single_statement?(operation) ⇒ Boolean (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 217

def single_statement?(operation)
  SINGLE_STATEMENT_OPS.include?(operation.keys.first)
end

#split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 269

def split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num)
  execute_operation(name, values.shift(values.size / 2), connection, context, operation_id, result_combiner,
                    session, txn_num)

  txn_num = session.next_txn_num if txn_num && !session.in_transaction?
  execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num)
end

#update_many(documents, connection, context, operation_id, session, _txn_num) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 306

def update_many(documents, connection, context, operation_id, session, _txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(updates: documents)
  Operation::Update.new(spec).bulk_execute(connection, context: context)
end

#update_one(documents, connection, context, operation_id, session, txn_num) (private) Also known as: #replace_one

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 298

def update_one(documents, connection, context, operation_id, session, txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(updates: documents, txn_num: txn_num)
  Operation::Update.new(spec).bulk_execute(connection, context: context)
end

#validate_hint!(connection) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 313

def validate_hint!(connection)
  return unless op_combiner.has_hint?
  return unless !can_hint?(connection) && write_concern && !write_concern.acknowledged?

  raise Error::UnsupportedOption.hint_error(unacknowledged_write: true)
end

#validate_requests! (private)

Perform the request document validation required by driver specifications. This method validates the first key of each update request document to be an operator (i.e. start with $) and the first key of each replacement document to not be an operator (i.e. not start with $). The request document may be invalid without this method flagging it as such (for example an update or replacement document containing some keys which are operators and some which are not), in which case the driver expects the server to fail the operation with an error.

Raise an ArgumentError if requests is empty.

Raises:

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 359

def validate_requests!
  requests_empty = true
  @requests.each do |req|
    requests_empty = false
    if op = req.keys.first
      if %i[update_one update_many].include?(op)
        if (doc = maybe_first(req.dig(op, :update))) && (key = doc.keys&.first) && !key.to_s.start_with?('$')
          raise Error::InvalidUpdateDocument.new(key: key) if Mongo.validate_update_replace

          Error::InvalidUpdateDocument.warn(Logger.logger, key)

        end
      elsif op == :replace_one
        if (key = req.dig(op, :replacement)&.keys&.first) && key.to_s.start_with?('$')
          raise Error::InvalidReplacementDocument.new(key: key) if Mongo.validate_update_replace

          Error::InvalidReplacementDocument.warn(Logger.logger, key)

        end
      end
    end
  end.tap do
    raise ArgumentError, 'Bulk write requests cannot be empty' if requests_empty
  end
end

#write_concern(session = nil) ⇒ WriteConcern

This method is for internal use only.

Get the write concern for the bulk write.

Examples:

Get the write concern.

bulk_write.write_concern

Returns:

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 162

def write_concern(session = nil)
  @write_concern ||= if options[:write_concern]
                       WriteConcern.get(options[:write_concern])
                     else
                       collection.write_concern_with_session(session)
                     end
end