Class: Mongo::Protocol::Msg Private
| Relationships & Source Files | |
| Namespace Children | |
| Classes: | |
| Super Chains via Extension / Inclusion / Inheritance | |
| Class Chain: 
          self,
           Message | |
| Instance Chain: | |
| Inherits: | Mongo::Protocol::Message 
 | 
| Defined in: | lib/mongo/protocol/msg.rb | 
Overview
MongoDB Wire protocol Msg message (OP_MSG), a bi-directional wire protocol opcode.
OP_MSG is only available in MongoDB 3.6 (maxWireVersion >= 6) and later.
Constant Summary
- 
    DATABASE_IDENTIFIER =
    # File 'lib/mongo/protocol/msg.rb', line 35The identifier for the database name to execute the command on. '$db'.freeze 
- 
    FLAGS =
    # File 'lib/mongo/protocol/msg.rb', line 393Available flags for a OP_MSG message. Array.new(16).tap do |arr| arr[0] = :checksum_present arr[1] = :more_to_come arr[16] = :exhaust_allowed end.freeze 
- 
    INTERNAL_KEYS =
    # File 'lib/mongo/protocol/msg.rb', line 41Keys that the driver adds to commands. These are going to be moved to the end of the hash for better logging. Set.new(%w($clusterTime $db lsid signature txnNumber)).freeze 
- 
    KNOWN_FLAGS =
    # File 'lib/mongo/protocol/msg.rb', line 386{ checksum_present: true, more_to_come: true, exhaust_allowed: true, }
- 
    OP_CODE =
    # File 'lib/mongo/protocol/msg.rb', line 384The operation code required to specify a OP_MSG message. 2013
Serializers - Included
  HEADER_PACK, INT32_PACK, INT64_PACK, NULL, ZERO
Message - Inherited
  BATCH_SIZE, COLLECTION, LIMIT, MAX_MESSAGE_SIZE, ORDERED, Q
::Mongo::Monitoring::Event::Secure - Included
  
Class Method Summary
- 
    
      .new(flags, options, main_document, *sequences)  ⇒ Msg 
    
    constructor
    Internal use only
    Creates a new OP_MSG protocol message. 
Message - Inherited
| .deserialize | Deserializes messages from an IO stream. | 
| .new | :nodoc: | 
| .deserialize_array | Deserializes an array of fields in a message. | 
| .deserialize_field | Deserializes a single field in a message. | 
| .deserialize_header | Deserializes the header of the message. | 
| .field | A method for declaring a message field. | 
| .fields | A class method for getting the fields for a message class. | 
Instance Attribute Summary
- 
    
      #bulk_write?  ⇒ Boolean 
    
    readonly
    Internal use only
    Whether this message represents a bulk write. 
- #flags ⇒ Array<Symbol> rw
- 
    
      #replyable?  ⇒ true, false 
    
    readonly
    Internal use only
    Whether the message expects a reply from the database. 
Message - Inherited
| #replyable? | The default for messages is not to require a reply after sending a message to the server. | 
| #request_id | Returns the request id for the message. | 
Instance Method Summary
- #documents Internal use only
- 
    
      #fix_after_deserialization  
    
    Internal use only
    Reverse-populates the instance variables after deserialization sets the @sections instance variable to the list of documents. 
- #maybe_add_server_api(server_api) Internal use only
- 
    
      #maybe_compress(compressor, zlib_compression_level = nil)  ⇒ Message 
    
    Internal use only
    Compress the message, if the command being sent permits compression. 
- 
    
      #maybe_decrypt(context)  ⇒ Mongo::Protocol::Msg 
    
    Internal use only
    Possibly decrypt this message with libmongocrypt. 
- 
    
      #maybe_encrypt(connection, context)  ⇒ Mongo::Protocol::Msg 
    
    Internal use only
    Possibly encrypt this message with libmongocrypt. 
- 
    
      #number_returned  ⇒ Integer 
    
    Internal use only
    Returns the number of documents returned from the server. 
- 
    
      #payload  ⇒ BSON::Document 
    
    Internal use only
    Return the event payload for monitoring. 
- 
    
      #sections  ⇒ Array<Hash> | Array<BSON::Document> 
    
    Internal use only
    The sections that will be serialized, or the documents have been deserialized. 
- 
    
      #serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil)  ⇒ BSON::ByteBuffer 
    
    Internal use only
    Serializes message into bytes that can be sent on the wire. 
- #add_check_sum(buffer) private Internal use only
- #command private Internal use only
- 
    
      #validate_document_size!(max_bson_size)  
    
    private
    Internal use only
    Validate that the documents in this message are all smaller than the maxBsonObjectSize. 
::Mongo::Monitoring::Event::Secure - Included
| #compression_allowed? | Is compression allowed for a given command message. | 
| #redacted | Redact secure information from the document if: | 
| #sensitive? | Check whether the command is sensitive in terms of command monitoring spec. | 
Message - Inherited
| #== | Tests for equality between two wire protocol messages by comparing class and field values. | 
| #eql? | Alias for Message#==. | 
| #hash | Creates a hash from the values of the fields of a message. | 
| #maybe_add_server_api, | |
| #maybe_compress | Compress the message, if supported by the wire protocol used and if the command being sent permits compression. | 
| #maybe_decrypt | Possibly decrypt this message with libmongocrypt. | 
| #maybe_encrypt | Possibly encrypt this message with libmongocrypt. | 
| #maybe_inflate | Inflate a message if it is compressed. | 
| #number_returned | Default number returned value for protocol messages. | 
| #serialize | Serializes message into bytes that can be sent on the wire. | 
| #set_request_id | Generates a request id for a message. | 
| #to_s | Alias for Message#serialize. | 
| #compress_if_possible | Compress the message, if the command being sent permits compression. | 
| #fields | A method for getting the fields for a message class. | 
| #merge_sections, | |
| #serialize_fields | Serializes message fields into a buffer. | 
| #serialize_header | Serializes the header of the message consisting of 4 32bit integers. | 
Instance Attribute Details
    #bulk_write?  ⇒ Boolean  (readonly)
  
This method was written to support client-side encryption functionality. It is not recommended that this method be used in service of any other feature or behavior.
Whether this message represents a bulk write. A bulk write is an insert, update, or delete operation that encompasses multiple operations of the same type.
# File 'lib/mongo/protocol/msg.rb', line 272
def bulk_write? inserts = @main_document['documents'] updates = @main_document['updates'] deletes = @main_document['deletes'] num_inserts = inserts && inserts.length || 0 num_updates = updates && updates.length || 0 num_deletes = deletes && deletes.length || 0 num_inserts > 1 || num_updates > 1 || num_deletes > 1 end
    #flags  ⇒ Array<Symbol>  (rw)
  
    #replyable?  ⇒ true, false  (readonly)
  
Whether the message expects a reply from the database.
# File 'lib/mongo/protocol/msg.rb', line 108
def replyable? @replyable ||= !flags.include?(:more_to_come) end
Instance Method Details
#add_check_sum(buffer) (private)
# File 'lib/mongo/protocol/msg.rb', line 354
def add_check_sum(buffer) if flags.include?(:checksum_present) #buffer.put_int32(checksum) end end
#command (private)
#documents
# File 'lib/mongo/protocol/msg.rb', line 196
def documents [@main_document] end
#fix_after_deserialization
Reverse-populates the instance variables after deserialization sets the @sections instance variable to the list of documents.
TODO fix deserialization so that this method is not needed.
# File 'lib/mongo/protocol/msg.rb', line 184
def fix_after_deserialization if @sections.nil? raise NotImplementedError, "After deserializations @sections should have been initialized" end if @sections.length != 1 raise NotImplementedError, "Deserialization must have produced exactly one section, but it produced #{sections.length} sections" end @main_document = @sections.first @sequences = [] @sections = [{type: 0, payload: @main_document}] end
#maybe_add_server_api(server_api)
# File 'lib/mongo/protocol/msg.rb', line 284
def maybe_add_server_api(server_api) conflicts = {} %i(apiVersion apiStrict apiDeprecationErrors).each do |key| if @main_document.key?(key) conflicts[key] = @main_document[key] end if @main_document.key?(key.to_s) conflicts[key] = @main_document[key.to_s] end end unless conflicts.empty? raise Error::ServerApiConflict, "The Client is configured with :server_api option but the operation provided the following conflicting parameters: #{conflicts.inspect}" end main_document = @main_document.merge( Utils.transform_server_api(server_api) ) Msg.new(@flags, @options, main_document, *@sequences) end
#maybe_compress(compressor, zlib_compression_level = nil) ⇒ Message
Compress the message, if the command being sent permits compression. Otherwise returns self.
# File 'lib/mongo/protocol/msg.rb', line 174
def maybe_compress(compressor, zlib_compression_level = nil) compress_if_possible(command.keys.first, compressor, zlib_compression_level) end
    #maybe_decrypt(context)  ⇒ Msg 
  
Possibly decrypt this message with libmongocrypt. Message will only be decrypted if the specified client exists, that client has been given auto-encryption options, and this message is eligible for decryption. A message is eligible for decryption if it represents one of the command types allow-listed by libmongocrypt and it contains data that is required to be encrypted by a local or remote json schema.
# File 'lib/mongo/protocol/msg.rb', line 251
def maybe_decrypt(context) if context.decrypt? cmd = merge_sections enc_cmd = context.decrypt(cmd) Msg.new(@flags, @options, enc_cmd) else self end end
    #maybe_encrypt(connection, context)  ⇒ Msg 
  
Possibly encrypt this message with libmongocrypt. Message will only be encrypted if the specified client exists, that client has been given auto-encryption options, the client has not been instructed to bypass auto-encryption, and mongocryptd determines that this message is eligible for encryption. A message is eligible for encryption if it represents one of the command types allow-listed by libmongocrypt and it contains data that is required to be encrypted by a local or remote json schema.
# File 'lib/mongo/protocol/msg.rb', line 214
def maybe_encrypt(connection, context) # TODO verify compression happens later, i.e. when this method runs # the message is not compressed. if context.encrypt? if connection.description.max_wire_version < 8 raise Error::CryptError.new( "Cannot perform encryption against a MongoDB server older than " + "4.2 (wire version less than 8). Currently connected to server " + "with max wire version #{connection.description.max_wire_version}} " + "(Auto-encryption requires a minimum MongoDB version of 4.2)" ) end db_name = @main_document[DATABASE_IDENTIFIER] cmd = merge_sections enc_cmd = context.encrypt(db_name, cmd) if cmd.key?('$db') && !enc_cmd.key?('$db') enc_cmd['$db'] = cmd['$db'] end Msg.new(@flags, @options, enc_cmd) else self end end
    #number_returned  ⇒ Integer 
  
Returns the number of documents returned from the server.
The Msg instance must be for a server reply and the reply must return an active cursor (either a newly created one or one whose iteration is continuing via getMore).
# File 'lib/mongo/protocol/msg.rb', line 311
def number_returned if doc = documents.first if cursor = doc['cursor'] if batch = cursor['firstBatch'] || cursor['nextBatch'] return batch.length end end end raise NotImplementedError, "number_returned is only defined for cursor replies" end
    #payload  ⇒ BSON::Document 
  
Return the event payload for monitoring.
# File 'lib/mongo/protocol/msg.rb', line 120
def payload # Reorder keys in main_document for better logging - see # https://jira.mongodb.org/browse/RUBY-1591. # Note that even without the reordering, the payload is not an exact # match to what is sent over the wire because the command as used in # the published event combines keys from multiple sections of the # payload sent over the wire. ordered_command = {} skipped_command = {} command.each do |k, v| if INTERNAL_KEYS.member?(k.to_s) skipped_command[k] = v else ordered_command[k] = v end end ordered_command.update(skipped_command) BSON::Document.new( command_name: ordered_command.keys.first.to_s, database_name: @main_document[DATABASE_IDENTIFIER], command: ordered_command, request_id: request_id, reply: @main_document, ) end
    #sections  ⇒ Array<Hash> | Array<BSON::Document> 
  
The sections that will be serialized, or the documents have been deserialized.
Usually the sections contain OP_MSG-compliant sections derived from @main_document and @sequences. The information in @main_document and @sequences is duplicated in the sections.
When deserializing Msg instances, sections temporarily is an array of documents returned in the type 0 section of the OP_MSG wire protocol message. #fix_after_deserialization method mutates this object to have sections, @main_document and @sequences be what they would have been had the Msg instance been constructed using the constructor (rather than having been deserialized).
# File 'lib/mongo/protocol/msg.rb', line 420
field :sections, Sections
    #serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil)  ⇒ BSON::ByteBuffer 
  
Serializes message into bytes that can be sent on the wire.
# File 'lib/mongo/protocol/msg.rb', line 155
def serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil) validate_document_size!(max_bson_size) super add_check_sum(buffer) buffer end
#validate_document_size!(max_bson_size) (private)
Validate that the documents in this message are all smaller than the maxBsonObjectSize. If not, raise an exception.
# File 'lib/mongo/protocol/msg.rb', line 326
def validate_document_size!(max_bson_size) max_bson_size ||= Mongo::Server::ConnectionBase::DEFAULT_MAX_BSON_OBJECT_SIZE contains_too_large_document = @sections.any? do |section| section[:type] == 1 && section[:payload][:sequence].any? do |document| document.to_bson.length > max_bson_size end end if contains_too_large_document raise Error::MaxBSONSize.new('The document exceeds maximum allowed BSON object size after serialization') end end