123456789_123456789_123456789_123456789_123456789_

Class: Mongo::BulkWrite

Constant Summary

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Operation::ResponseHandling - Included

#add_error_labels

Adds error labels to exceptions raised in the yielded to block, which should perform MongoDB operations and raise Mongo::Errors on failure.

#add_server_diagnostics

Yields to the block and, if the block raises an exception, adds a note to the exception with the address of the specified server.

#maybe_add_retryable_write_error_label!

A method that will add the RetryableWriteError label to an error if any of the following conditions are true:

#unpin_maybe

Unpins the session and/or the connection if the yielded to block raises errors that are required to unpin the session and the connection.

#validate_result

Instance Attribute Details

#collectionMongo::Collection (readonly)

Returns:

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 32

attr_reader :collection

#optionsHash, BSON::Document (readonly)

Returns:

  • (Hash, BSON::Document)

    options The options.

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 38

attr_reader :options

#ordered?true, false (readonly)

This method is for internal use only.

Is the bulk write ordered?

Examples:

Is the bulk write ordered?

bulk_write.ordered?

Returns:

  • (true, false)

    If the bulk write is ordered.

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 147

def ordered?
  @ordered ||= options.fetch(:ordered, true)
end

#requestsArray<Hash, BSON::Document> (readonly)

Returns:

  • (Array<Hash, BSON::Document>)

    requests The requests.

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 35

attr_reader :requests

Instance Method Details

#base_spec(operation_id, session) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 202

def base_spec(operation_id, session)
  {
    :db_name => database.name,
    :coll_name => collection.name,
    :write_concern => write_concern(session),
    :ordered => ordered?,
    :operation_id => operation_id,
    :bypass_document_validation => !!options[:bypass_document_validation],
    :max_time_ms => options[:max_time_ms],
    :options => options,
    :id_generator => client.options[:id_generator],
    :session => session,
    :comment => options[:comment],
    :let => options[:let],
  }
end

#calculate_deadlineFloat | nil (private)

Returns:

  • (Float | nil)

    Deadline for the batch of operations, if set.

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 174

def calculate_deadline
  timeout_ms = @options[:timeout_ms] || collection.timeout_ms
  return nil if timeout_ms.nil?

  if timeout_ms == 0
    0
  else
    Utils.monotonic_time + (timeout_ms / 1_000.0)
  end
end

#can_hint?(connection) ⇒ true | false (private)

Loop through the requests and check if each operation is allowed to send a hint for each operation on the given server version.

For the following operations, the client can send a hint for servers >= 4.2 and for the rest, the client can only send it for 4.4+:

- updateOne
- updateMany
- replaceOne

Parameters:

  • connection (Connection)

    The connection object.

Returns:

  • (true | false)

    Whether the request is able to send hints for the current server version.

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 335

def can_hint?(connection)
  gte_4_2 = connection.server.description.server_version_gte?('4.2')
  gte_4_4 = connection.server.description.server_version_gte?('4.4')
  op_combiner.requests.all? do |req|
    op = req.keys.first
    if req[op].keys.include?(:hint)
      if [:update_one, :update_many, :replace_one].include?(op)
        gte_4_2
      else
        gte_4_4
      end
    else
      true
    end
  end
end

#delete_many(documents, connection, context, operation_id, session, txn_num) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 269

def delete_many(documents, connection, context, operation_id, session, txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(:deletes => documents)
  Operation::Delete.new(spec).bulk_execute(connection, context: context)
end

#delete_one(documents, connection, context, operation_id, session, txn_num) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 262

def delete_one(documents, connection, context, operation_id, session, txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(:deletes => documents, :txn_num => txn_num)
  Operation::Delete.new(spec).bulk_execute(connection, context: context)
end

#executeMongo::BulkWrite::Result

Execute the bulk write operation.

Examples:

Execute the bulk write.

bulk_write.execute

Returns:

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 58

def execute
  operation_id = Monitoring.next_operation_id
  result_combiner = ResultCombiner.new
  operations = op_combiner.combine
  validate_requests!
  deadline = calculate_deadline

  client.with_session(@options) do |session|
    operations.each do |operation|
      context = Operation::Context.new(
        client: client,
        session: session,
        operation_timeouts: { operation_timeout_ms: op_timeout_ms(deadline) }
      )
      if single_statement?(operation)
        write_concern = write_concern(session)
        write_with_retry(write_concern, context: context) do |connection, txn_num, context|
          execute_operation(
            operation.keys.first,
            operation.values.flatten,
            connection,
            context,
            operation_id,
            result_combiner,
            session,
            txn_num)
        end
      else
        nro_write_with_retry(write_concern, context: context) do |connection, txn_num, context|
          execute_operation(
            operation.keys.first,
            operation.values.flatten,
            connection,
            context,
            operation_id,
            result_combiner,
            session)
        end
      end
    end
  end
  result_combiner.result
end

#execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num = nil) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 219

def execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num = nil)
  validate_collation!(connection)
  validate_array_filters!(connection)
  validate_hint!(connection)

  unpin_maybe(session, connection) do
    if values.size > connection.description.max_write_batch_size
      split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num)
    else
      result = send(name, values, connection, context, operation_id, session, txn_num)

      add_server_diagnostics(connection) do
        add_error_labels(connection, context) do
          result_combiner.combine!(result, values.size)
        end
      end
    end
  end
# With OP_MSG (3.6+ servers), the size of each section in the message
# is independently capped at 16m and each bulk operation becomes
# its own section. The size of the entire bulk write is limited to 48m.
# With OP_QUERY (pre-3.6 servers), the entire bulk write is sent as a
# single document and is thus subject to the 16m document size limit.
# This means the splits differ between pre-3.6 and 3.6+ servers, with
# 3.6+ servers being able to split less.
rescue Error::MaxBSONSize, Error::MaxMessageSize => e
  raise e if values.size <= 1
  unpin_maybe(session, connection) do
    split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num)
  end
end

#insert_one(documents, connection, context, operation_id, session, txn_num) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 276

def insert_one(documents, connection, context, operation_id, session, txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(:documents => documents, :txn_num => txn_num)
  Operation::Insert.new(spec).bulk_execute(connection, context: context)
end

#maybe_first(obj) ⇒ Object (private)

If the given object is an array return the first element, otherwise return the given object.

Parameters:

  • obj (Object)

    The given object.

Returns:

  • (Object)

    The first element of the array or the given object.

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 406

def maybe_first(obj)
  obj.is_a?(Array) ? obj.first : obj
end

#op_combiner (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 251

def op_combiner
  @op_combiner ||= ordered? ? OrderedCombiner.new(requests) : UnorderedCombiner.new(requests)
end

#op_timeout_ms(deadline) ⇒ Integer | nil (private)

Parameters:

  • deadline (Float | nil)

    Deadline for the batch of operations.

Returns:

  • (Integer | nil)

    Timeout in milliseconds for the next operation.

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 188

def op_timeout_ms(deadline)
  return nil if deadline.nil?

  if deadline == 0
    0
  else
    ((deadline - Utils.monotonic_time) * 1_000).to_i
  end
end

#replace_one(documents, connection, context, operation_id, session, txn_num) (private)

Alias for #update_one.

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 289

alias :replace_one :update_one

#single_statement?(operation) ⇒ Boolean (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 198

def single_statement?(operation)
  SINGLE_STATEMENT_OPS.include?(operation.keys.first)
end

#split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 255

def split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num)
  execute_operation(name, values.shift(values.size / 2), connection, context, operation_id, result_combiner, session, txn_num)

  txn_num = session.next_txn_num if txn_num && !session.in_transaction?
  execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num)
end

#update_many(documents, connection, context, operation_id, session, txn_num) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 291

def update_many(documents, connection, context, operation_id, session, txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(:updates => documents)
  Operation::Update.new(spec).bulk_execute(connection, context: context)
end

#update_one(documents, connection, context, operation_id, session, txn_num) (private) Also known as: #replace_one

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 283

def update_one(documents, connection, context, operation_id, session, txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(:updates => documents, :txn_num => txn_num)
  Operation::Update.new(spec).bulk_execute(connection, context: context)
end

#validate_array_filters!(connection) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 306

def validate_array_filters!(connection)
  if op_combiner.has_array_filters? && !connection.features.array_filters_enabled?
    raise Error::UnsupportedArrayFilters.new
  end
end

#validate_collation!(connection) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 300

def validate_collation!(connection)
  if op_combiner.has_collation? && !connection.features.collation_enabled?
    raise Error::UnsupportedCollation.new
  end
end

#validate_hint!(connection) (private)

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 312

def validate_hint!(connection)
  if op_combiner.has_hint?
    if !can_hint?(connection) && write_concern && !write_concern.acknowledged?
      raise Error::UnsupportedOption.hint_error(unacknowledged_write: true)
    elsif !connection.features.update_delete_option_validation_enabled?
      raise Error::UnsupportedOption.hint_error
    end
  end
end

#validate_requests! (private)

Perform the request document validation required by driver specifications. This method validates the first key of each update request document to be an operator (i.e. start with $) and the first key of each replacement document to not be an operator (i.e. not start with $). The request document may be invalid without this method flagging it as such (for example an update or replacement document containing some keys which are operators and some which are not), in which case the driver expects the server to fail the operation with an error.

Raise an ArgumentError if requests is empty.

Raises:

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 366

def validate_requests!
  requests_empty = true
  @requests.each do |req|
    requests_empty = false
    if op = req.keys.first
      if [:update_one, :update_many].include?(op)
        if doc = maybe_first(req.dig(op, :update))
          if key = doc.keys&.first
            unless key.to_s.start_with?("$")
              if Mongo.validate_update_replace
                raise Error::InvalidUpdateDocument.new(key: key)
              else
                Error::InvalidUpdateDocument.warn(Logger.logger, key)
              end
            end
          end
        end
      elsif op == :replace_one
        if key = req.dig(op, :replacement)&.keys&.first
          if key.to_s.start_with?("$")
            if Mongo.validate_update_replace
              raise Error::InvalidReplacementDocument.new(key: key)
            else
              Error::InvalidReplacementDocument.warn(Logger.logger, key)
            end
          end
        end
      end
    end
  end.tap do
    raise ArgumentError, "Bulk write requests cannot be empty" if requests_empty
  end
end

#write_concern(session = nil) ⇒ WriteConcern

This method is for internal use only.

Get the write concern for the bulk write.

Examples:

Get the write concern.

bulk_write.write_concern

Returns:

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/bulk_write.rb', line 161

def write_concern(session = nil)
  @write_concern ||= options[:write_concern] ?
    WriteConcern.get(options[:write_concern]) :
    collection.write_concern_with_session(session)
end