123456789_123456789_123456789_123456789_123456789_

Class: Mongo::BulkWrite

Constant Summary

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Operation::ResponseHandling - Included

#add_error_labels

Adds error labels to exceptions raised in the yielded to block, which should perform MongoDB operations and raise Mongo::Errors on failure.

#add_server_diagnostics

Yields to the block and, if the block raises an exception, adds a note to the exception with the address of the specified server.

#maybe_add_retryable_write_error_label!

A method that will add the RetryableWriteError label to an error if any of the following conditions are true:

#unpin_maybe

Unpins the session and/or the connection if the yielded to block raises errors that are required to unpin the session and the connection.

#validate_result

Instance Attribute Details

#collectionMongo::Collection (readonly)

Returns:

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 32

attr_reader :collection

#optionsHash, BSON::Document (readonly)

Returns:

  • (Hash, BSON::Document)

    options The options.

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 38

attr_reader :options

#ordered?true, false (readonly)

This method is for internal use only.

Is the bulk write ordered?

Examples:

Is the bulk write ordered?

bulk_write.ordered?

Returns:

  • (true, false)

    If the bulk write is ordered.

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 147

def ordered?
  @ordered ||= options.fetch(:ordered, true)
end

#requestsArray<Hash, BSON::Document> (readonly)

Returns:

  • (Array<Hash, BSON::Document>)

    requests The requests.

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 35

attr_reader :requests

Instance Method Details

#base_spec(operation_id, session) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 218

def base_spec(operation_id, session)
  {
    :db_name => database.name,
    :coll_name => collection.name,
    :write_concern => write_concern(session),
    :ordered => ordered?,
    :operation_id => operation_id,
    :bypass_document_validation => !!options[:bypass_document_validation],
    :max_time_ms => options[:max_time_ms],
    :options => options,
    :id_generator => client.options[:id_generator],
    :session => session,
    :comment => options[:comment],
    :let => options[:let],
  }
end

#calculate_deadlineFloat | nil (private)

Returns:

  • (Float | nil)

    Deadline for the batch of operations, if set.

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 174

def calculate_deadline
  timeout_ms = @options[:timeout_ms] || collection.timeout_ms
  return nil if timeout_ms.nil?

  if timeout_ms == 0
    0
  else
    Utils.monotonic_time + (timeout_ms / 1_000.0)
  end
end

#can_hint?(connection) ⇒ true | false (private)

Loop through the requests and check if each operation is allowed to send a hint for each operation on the given server version.

For the following operations, the client can send a hint for all supported server versions, and for the rest, the client can only send it for 4.4+:

- updateOne
- updateMany
- replaceOne

Parameters:

  • connection (Connection)

    The connection object.

Returns:

  • (true | false)

    Whether the request is able to send hints for the current server version.

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 331

def can_hint?(connection)
  gte_4_4 = connection.server.description.server_version_gte?('4.4')
  op_combiner.requests.all? do |req|
    op = req.keys.first
    if req[op].keys.include?(:hint)
      if [:update_one, :update_many, :replace_one].include?(op)
        true
      else
        gte_4_4
      end
    else
      true
    end
  end
end

#delete_many(documents, connection, context, operation_id, session, txn_num) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 279

def delete_many(documents, connection, context, operation_id, session, txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(:deletes => documents)
  Operation::Delete.new(spec).bulk_execute(connection, context: context)
end

#delete_one(documents, connection, context, operation_id, session, txn_num) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 272

def delete_one(documents, connection, context, operation_id, session, txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(:deletes => documents, :txn_num => txn_num)
  Operation::Delete.new(spec).bulk_execute(connection, context: context)
end

#executeMongo::BulkWrite::Result

Execute the bulk write operation.

Examples:

Execute the bulk write.

bulk_write.execute

Returns:

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 58

def execute
  operation_id = Monitoring.next_operation_id
  result_combiner = ResultCombiner.new
  operations = op_combiner.combine
  validate_requests!
  deadline = calculate_deadline

  client.with_session(@options) do |session|
    operations.each do |operation|
      context = Operation::Context.new(
        client: client,
        session: session,
        operation_timeouts: operation_timeouts(deadline)
      )
      if single_statement?(operation)
        write_concern = write_concern(session)
        write_with_retry(write_concern, context: context) do |connection, txn_num, context|
          execute_operation(
            operation.keys.first,
            operation.values.flatten,
            connection,
            context,
            operation_id,
            result_combiner,
            session,
            txn_num)
        end
      else
        nro_write_with_retry(write_concern, context: context) do |connection, txn_num, context|
          execute_operation(
            operation.keys.first,
            operation.values.flatten,
            connection,
            context,
            operation_id,
            result_combiner,
            session)
        end
      end
    end
  end
  result_combiner.result
end

#execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num = nil) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 235

def execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num = nil)
  validate_hint!(connection)

  unpin_maybe(session, connection) do
    if values.size > connection.description.max_write_batch_size
      split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num)
    else
      result = send(name, values, connection, context, operation_id, session, txn_num)

      add_server_diagnostics(connection) do
        add_error_labels(connection, context) do
          result_combiner.combine!(result, values.size)
        end
      end
    end
  end
# The size of each section in the message
# is independently capped at 16m and each bulk operation becomes
# its own section. The size of the entire bulk write is limited to 48m.
rescue Error::MaxBSONSize, Error::MaxMessageSize => e
  raise e if values.size <= 1
  unpin_maybe(session, connection) do
    split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num)
  end
end

#insert_one(documents, connection, context, operation_id, session, txn_num) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 286

def insert_one(documents, connection, context, operation_id, session, txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(:documents => documents, :txn_num => txn_num)
  Operation::Insert.new(spec).bulk_execute(connection, context: context)
end

#maybe_first(obj) ⇒ Object (private)

If the given object is an array return the first element, otherwise return the given object.

Parameters:

  • obj (Object)

    The given object.

Returns:

  • (Object)

    The first element of the array or the given object.

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 401

def maybe_first(obj)
  obj.is_a?(Array) ? obj.first : obj
end

#op_combiner (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 261

def op_combiner
  @op_combiner ||= ordered? ? OrderedCombiner.new(requests) : UnorderedCombiner.new(requests)
end

#op_timeout_ms(deadline) ⇒ Integer | nil (private)

Parameters:

  • deadline (Float | nil)

    Deadline for the batch of operations.

Returns:

  • (Integer | nil)

    Timeout in milliseconds for the next operation.

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 204

def op_timeout_ms(deadline)
  return nil if deadline.nil?

  if deadline == 0
    0
  else
    ((deadline - Utils.monotonic_time) * 1_000).to_i
  end
end

#operation_timeouts(deadline) (private)

Returns the operation_timeouts hash for creating an Operation::Context. Uses operation_timeout_ms when the timeout was explicitly set on the bulk write, or inherited_timeout_ms when it comes from the collection or client. This distinction is important inside transactions where operation_timeout_ms is not allowed (RUBY-3685).

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 190

def operation_timeouts(deadline)
  timeout = op_timeout_ms(deadline)
  return {} if timeout.nil?

  if @options[:timeout_ms]
    { operation_timeout_ms: timeout }
  else
    { inherited_timeout_ms: timeout }
  end
end

#replace_one(documents, connection, context, operation_id, session, txn_num) (private)

Alias for #update_one.

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 299

alias :replace_one :update_one

#single_statement?(operation) ⇒ Boolean (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 214

def single_statement?(operation)
  SINGLE_STATEMENT_OPS.include?(operation.keys.first)
end

#split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 265

def split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num)
  execute_operation(name, values.shift(values.size / 2), connection, context, operation_id, result_combiner, session, txn_num)

  txn_num = session.next_txn_num if txn_num && !session.in_transaction?
  execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num)
end

#update_many(documents, connection, context, operation_id, session, txn_num) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 301

def update_many(documents, connection, context, operation_id, session, txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(:updates => documents)
  Operation::Update.new(spec).bulk_execute(connection, context: context)
end

#update_one(documents, connection, context, operation_id, session, txn_num) (private) Also known as: #replace_one

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 293

def update_one(documents, connection, context, operation_id, session, txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(:updates => documents, :txn_num => txn_num)
  Operation::Update.new(spec).bulk_execute(connection, context: context)
end

#validate_hint!(connection) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 310

def validate_hint!(connection)
  if op_combiner.has_hint?
    if !can_hint?(connection) && write_concern && !write_concern.acknowledged?
      raise Error::UnsupportedOption.hint_error(unacknowledged_write: true)
    end
  end
end

#validate_requests! (private)

Perform the request document validation required by driver specifications. This method validates the first key of each update request document to be an operator (i.e. start with $) and the first key of each replacement document to not be an operator (i.e. not start with $). The request document may be invalid without this method flagging it as such (for example an update or replacement document containing some keys which are operators and some which are not), in which case the driver expects the server to fail the operation with an error.

Raise an ArgumentError if requests is empty.

Raises:

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 361

def validate_requests!
  requests_empty = true
  @requests.each do |req|
    requests_empty = false
    if op = req.keys.first
      if [:update_one, :update_many].include?(op)
        if doc = maybe_first(req.dig(op, :update))
          if key = doc.keys&.first
            unless key.to_s.start_with?("$")
              if Mongo.validate_update_replace
                raise Error::InvalidUpdateDocument.new(key: key)
              else
                Error::InvalidUpdateDocument.warn(Logger.logger, key)
              end
            end
          end
        end
      elsif op == :replace_one
        if key = req.dig(op, :replacement)&.keys&.first
          if key.to_s.start_with?("$")
            if Mongo.validate_update_replace
              raise Error::InvalidReplacementDocument.new(key: key)
            else
              Error::InvalidReplacementDocument.warn(Logger.logger, key)
            end
          end
        end
      end
    end
  end.tap do
    raise ArgumentError, "Bulk write requests cannot be empty" if requests_empty
  end
end

#write_concern(session = nil) ⇒ WriteConcern

This method is for internal use only.

Get the write concern for the bulk write.

Examples:

Get the write concern.

bulk_write.write_concern

Returns:

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 161

def write_concern(session = nil)
  @write_concern ||= options[:write_concern] ?
    WriteConcern.get(options[:write_concern]) :
    collection.write_concern_with_session(session)
end