123456789_123456789_123456789_123456789_123456789_

Class: Mongo::Protocol::Msg Private

Do not use. This class is for internal use only.
Relationships & Source Files
Namespace Children
Classes:
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
self, Message
Instance Chain:
Inherits: Mongo::Protocol::Message
Defined in: lib/mongo/protocol/msg.rb

Overview

MongoDB Wire protocol Msg message (OP_MSG), a bi-directional wire protocol opcode.

OP_MSG is only available in MongoDB 3.6 (maxWireVersion >= 6) and later.

Since:

  • 2.5.0

Constant Summary

Serializers - Included

HEADER_PACK, INT32_PACK, INT64_PACK, NULL, ZERO

Message - Inherited

BATCH_SIZE, COLLECTION, LIMIT, MAX_MESSAGE_SIZE, ORDERED, Q

::Mongo::Monitoring::Event::Secure - Included

REDACTED_COMMANDS

Class Method Summary

Message - Inherited

.deserialize

Deserializes messages from an IO stream.

.new

:nodoc:

.deserialize_array

Deserializes an array of fields in a message.

.deserialize_field

Deserializes a single field in a message.

.deserialize_header

Deserializes the header of the message.

.field

A method for declaring a message field.

.fields

A class method for getting the fields for a message class.

Instance Attribute Summary

Message - Inherited

#replyable?

The default for messages is not to require a reply after sending a message to the server.

#request_id

Returns the request id for the message.

Instance Method Summary

::Mongo::Monitoring::Event::Secure - Included

#compression_allowed?

Is compression allowed for a given command message.

#redacted

Redact secure information from the document if:

#sensitive?

Check whether the command is sensitive in terms of command monitoring spec.

Message - Inherited

#==

Tests for equality between two wire protocol messages by comparing class and field values.

#eql?

Alias for Message#==.

#hash

Creates a hash from the values of the fields of a message.

#maybe_add_server_api,
#maybe_compress

Compress the message, if supported by the wire protocol used and if the command being sent permits compression.

#maybe_decrypt

Possibly decrypt this message with libmongocrypt.

#maybe_encrypt

Possibly encrypt this message with libmongocrypt.

#maybe_inflate

Inflate a message if it is compressed.

#number_returned

Default number returned value for protocol messages.

#serialize

Serializes message into bytes that can be sent on the wire.

#set_request_id

Generates a request id for a message.

#to_s
#compress_if_possible

Compress the message, if the command being sent permits compression.

#fields

A method for getting the fields for a message class.

#merge_sections,
#serialize_fields

Serializes message fields into a buffer.

#serialize_header

Serializes the header of the message consisting of 4 32bit integers.

Instance Attribute Details

#bulk_write?Boolean (readonly)

Note:

This method was written to support client-side encryption functionality. It is not recommended that this method be used in service of any other feature or behavior.

Whether this message represents a bulk write. A bulk write is an insert, update, or delete operation that encompasses multiple operations of the same type.

Returns:

  • (Boolean)

    Whether this message represents a bulk write.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/protocol/msg.rb', line 272

def bulk_write?
  inserts = @main_document['documents']
  updates = @main_document['updates']
  deletes = @main_document['deletes']

  num_inserts = inserts && inserts.length || 0
  num_updates = updates && updates.length || 0
  num_deletes = deletes && deletes.length || 0

  num_inserts > 1  || num_updates > 1 || num_deletes > 1
end

#flagsArray<Symbol> (rw)

Returns:

  • (Array<Symbol>)

    The flags for this message.

[ GitHub ]

  
# File 'lib/mongo/protocol/msg.rb', line 401

field :flags, BitVector.new(FLAGS)

#replyable?true, false (readonly)

Whether the message expects a reply from the database.

Examples:

Does the message require a reply?

message.replyable?

Returns:

  • (true, false)

    If the message expects a reply.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/protocol/msg.rb', line 108

def replyable?
  @replyable ||= !flags.include?(:more_to_come)
end

Instance Method Details

#add_check_sum(buffer) (private)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/protocol/msg.rb', line 354

def add_check_sum(buffer)
  if flags.include?(:checksum_present)
    #buffer.put_int32(checksum)
  end
end

#command (private)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/protocol/msg.rb', line 341

def command
  @command ||= if @main_document
    @main_document.dup.tap do |cmd|
      @sequences.each do |section|
        cmd[section.identifier] ||= []
        cmd[section.identifier] += section.documents
      end
    end
  else
    documents.first
  end
end

#documents

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/protocol/msg.rb', line 196

def documents
  [@main_document]
end

#fix_after_deserialization

Reverse-populates the instance variables after deserialization sets the @sections instance variable to the list of documents.

TODO fix deserialization so that this method is not needed.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/protocol/msg.rb', line 184

def fix_after_deserialization
  if @sections.nil?
    raise NotImplementedError, "After deserializations @sections should have been initialized"
  end
  if @sections.length != 1
    raise NotImplementedError, "Deserialization must have produced exactly one section, but it produced #{sections.length} sections"
  end
  @main_document = @sections.first
  @sequences = []
  @sections = [{type: 0, payload: @main_document}]
end

#maybe_add_server_api(server_api)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/protocol/msg.rb', line 284

def maybe_add_server_api(server_api)
  conflicts = {}
  %i(apiVersion apiStrict apiDeprecationErrors).each do |key|
    if @main_document.key?(key)
      conflicts[key] = @main_document[key]
    end
    if @main_document.key?(key.to_s)
      conflicts[key] = @main_document[key.to_s]
    end
  end
  unless conflicts.empty?
    raise Error::ServerApiConflict, "The Client is configured with :server_api option but the operation provided the following conflicting parameters: #{conflicts.inspect}"
  end

  main_document = @main_document.merge(
    Utils.transform_server_api(server_api)
  )
  Msg.new(@flags, @options, main_document, *@sequences)
end

#maybe_compress(compressor, zlib_compression_level = nil) ⇒ Message

Compress the message, if the command being sent permits compression. Otherwise returns self.

Parameters:

  • compressor (String, Symbol)

    The compressor to use.

  • zlib_compression_level (Integer) (defaults to: nil)

    The zlib compression level to use.

Returns:

  • (Message)

    A Protocol::Compressed message or self, depending on whether this message can be compressed.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/protocol/msg.rb', line 174

def maybe_compress(compressor, zlib_compression_level = nil)
  compress_if_possible(command.keys.first, compressor, zlib_compression_level)
end

#maybe_decrypt(context) ⇒ Msg

Possibly decrypt this message with libmongocrypt. Message will only be decrypted if the specified client exists, that client has been given auto-encryption options, and this message is eligible for decryption. A message is eligible for decryption if it represents one of the command types allow-listed by libmongocrypt and it contains data that is required to be encrypted by a local or remote json schema.

Parameters:

Returns:

  • (Msg)

    The decrypted message, or the original message if decryption was not possible or necessary.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/protocol/msg.rb', line 251

def maybe_decrypt(context)
  if context.decrypt?
    cmd = merge_sections
    enc_cmd = context.decrypt(cmd)
    Msg.new(@flags, @options, enc_cmd)
  else
    self
  end
end

#maybe_encrypt(connection, context) ⇒ Msg

Possibly encrypt this message with libmongocrypt. Message will only be encrypted if the specified client exists, that client has been given auto-encryption options, the client has not been instructed to bypass auto-encryption, and mongocryptd determines that this message is eligible for encryption. A message is eligible for encryption if it represents one of the command types allow-listed by libmongocrypt and it contains data that is required to be encrypted by a local or remote json schema.

Parameters:

Returns:

  • (Msg)

    The encrypted message, or the original message if encryption was not possible or necessary.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/protocol/msg.rb', line 214

def maybe_encrypt(connection, context)
  # TODO verify compression happens later, i.e. when this method runs
  # the message is not compressed.
  if context.encrypt?
    if connection.description.max_wire_version < 8
      raise Error::CryptError.new(
        "Cannot perform encryption against a MongoDB server older than " +
        "4.2 (wire version less than 8). Currently connected to server " +
        "with max wire version #{connection.description.max_wire_version}} " +
        "(Auto-encryption requires a minimum MongoDB version of 4.2)"
      )
    end

    db_name = @main_document[DATABASE_IDENTIFIER]
    cmd = merge_sections
    enc_cmd = context.encrypt(db_name, cmd)
    if cmd.key?('$db') && !enc_cmd.key?('$db')
      enc_cmd['$db'] = cmd['$db']
    end

    Msg.new(@flags, @options, enc_cmd)
  else
    self
  end
end

#number_returnedInteger

Returns the number of documents returned from the server.

The Msg instance must be for a server reply and the reply must return an active cursor (either a newly created one or one whose iteration is continuing via getMore).

Returns:

  • (Integer)

    Number of returned documents.

Raises:

  • (NotImplementedError)

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/protocol/msg.rb', line 311

def number_returned
  if doc = documents.first
    if cursor = doc['cursor']
      if batch = cursor['firstBatch'] || cursor['nextBatch']
        return batch.length
      end
    end
  end
  raise NotImplementedError, "number_returned is only defined for cursor replies"
end

#payloadBSON::Document

Return the event payload for monitoring.

Examples:

Return the event payload.

message.payload

Returns:

  • (BSON::Document)

    The event payload.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/protocol/msg.rb', line 120

def payload
  # Reorder keys in main_document for better logging - see
  # https://jira.mongodb.org/browse/RUBY-1591.
  # Note that even without the reordering, the payload is not an exact
  # match to what is sent over the wire because the command as used in
  # the published event combines keys from multiple sections of the
  # payload sent over the wire.
  ordered_command = {}
  skipped_command = {}
  command.each do |k, v|
    if INTERNAL_KEYS.member?(k.to_s)
      skipped_command[k] = v
    else
      ordered_command[k] = v
    end
  end
  ordered_command.update(skipped_command)

  BSON::Document.new(
    command_name: ordered_command.keys.first.to_s,
    database_name: @main_document[DATABASE_IDENTIFIER],
    command: ordered_command,
    request_id: request_id,
    reply: @main_document,
  )
end

#sectionsArray<Hash> | Array<BSON::Document>

The sections that will be serialized, or the documents have been deserialized.

Usually the sections contain OP_MSG-compliant sections derived from @main_document and @sequences. The information in @main_document and @sequences is duplicated in the sections.

When deserializing Msg instances, sections temporarily is an array of documents returned in the type 0 section of the OP_MSG wire protocol message. #fix_after_deserialization method mutates this object to have sections, @main_document and @sequences be what they would have been had the Msg instance been constructed using the constructor (rather than having been deserialized).

Returns:

  • (Array<Hash> | Array<BSON::Document>)

    The sections of payload type 1 or 0.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/protocol/msg.rb', line 420

field :sections, Sections

#serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil) ⇒ BSON::ByteBuffer

Serializes message into bytes that can be sent on the wire.

Parameters:

  • buffer (BSON::ByteBuffer) (defaults to: BSON::ByteBuffer.new)

    where the message should be inserted.

  • max_bson_size (Integer) (defaults to: nil)

    The maximum bson object size.

Returns:

  • (BSON::ByteBuffer)

    buffer containing the serialized message.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/protocol/msg.rb', line 155

def serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil)
  validate_document_size!(max_bson_size)

  super
  add_check_sum(buffer)
  buffer
end

#validate_document_size!(max_bson_size) (private)

Validate that the documents in this message are all smaller than the maxBsonObjectSize. If not, raise an exception.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/protocol/msg.rb', line 326

def validate_document_size!(max_bson_size)
  max_bson_size ||= Mongo::Server::ConnectionBase::DEFAULT_MAX_BSON_OBJECT_SIZE

  contains_too_large_document = @sections.any? do |section|
    section[:type] == 1 &&
      section[:payload][:sequence].any? do |document|
        document.to_bson.length > max_bson_size
      end
  end

  if contains_too_large_document
    raise Error::MaxBSONSize.new('The document exceeds maximum allowed BSON object size after serialization')
  end
end