Class: Mongo::Protocol::Msg Private
Relationships & Source Files | |
Namespace Children | |
Classes:
| |
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
self,
Message
|
|
Instance Chain:
|
|
Inherits: |
Mongo::Protocol::Message
|
Defined in: | lib/mongo/protocol/msg.rb |
Overview
MongoDB Wire protocol Msg
message (OP_MSG), a bi-directional wire protocol opcode.
OP_MSG is only available in MongoDB 3.6 (maxWireVersion >= 6) and later.
Constant Summary
-
DATABASE_IDENTIFIER =
The identifier for the database name to execute the command on.
'$db'.freeze
-
FLAGS =
Available flags for a OP_MSG message.
Array.new(16).tap do |arr| arr[0] = :checksum_present arr[1] = :more_to_come arr[16] = :exhaust_allowed end.freeze
-
INTERNAL_KEYS =
Keys that the driver adds to commands. These are going to be moved to the end of the hash for better logging.
Set.new(%w($clusterTime $db lsid signature txnNumber)).freeze
-
KNOWN_FLAGS =
{ checksum_present: true, more_to_come: true, exhaust_allowed: true, }
-
OP_CODE =
The operation code required to specify a OP_MSG message.
2013
Serializers
- Included
HEADER_PACK, INT32_PACK, INT64_PACK, NULL, ZERO
Message
- Inherited
BATCH_SIZE, COLLECTION, LIMIT, MAX_MESSAGE_SIZE, ORDERED, Q
::Mongo::Monitoring::Event::Secure
- Included
Class Method Summary
-
.new(flags, options, main_document, *sequences) ⇒ Msg
constructor
Internal use only
Creates a new OP_MSG protocol message.
Message
- Inherited
.deserialize | Deserializes messages from an IO stream. |
.new | :nodoc: |
.deserialize_array | Deserializes an array of fields in a message. |
.deserialize_field | Deserializes a single field in a message. |
.deserialize_header | Deserializes the header of the message. |
.field | A method for declaring a message field. |
.fields | A class method for getting the fields for a message class. |
Instance Attribute Summary
-
#bulk_write? ⇒ Boolean
readonly
Internal use only
Whether this message represents a bulk write.
- #flags ⇒ Array<Symbol> rw
-
#replyable? ⇒ true, false
readonly
Internal use only
Whether the message expects a reply from the database.
Message
- Inherited
#replyable? | The default for messages is not to require a reply after sending a message to the server. |
#request_id | Returns the request id for the message. |
Instance Method Summary
- #documents Internal use only
-
#fix_after_deserialization
Internal use only
Reverse-populates the instance variables after deserialization sets the @sections instance variable to the list of documents.
- #maybe_add_server_api(server_api) Internal use only
-
#maybe_compress(compressor, zlib_compression_level = nil) ⇒ Message
Internal use only
Compress the message, if the command being sent permits compression.
-
#maybe_decrypt(context) ⇒ Mongo::Protocol::Msg
Internal use only
Possibly decrypt this message with libmongocrypt.
-
#maybe_encrypt(connection, context) ⇒ Mongo::Protocol::Msg
Internal use only
Possibly encrypt this message with libmongocrypt.
-
#number_returned ⇒ Integer
Internal use only
Returns the number of documents returned from the server.
-
#payload ⇒ BSON::Document
Internal use only
Return the event payload for monitoring.
-
#sections ⇒ Array<Hash> | Array<BSON::Document>
Internal use only
The sections that will be serialized, or the documents have been deserialized.
-
#serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil) ⇒ BSON::ByteBuffer
Internal use only
Serializes message into bytes that can be sent on the wire.
- #add_check_sum(buffer) private Internal use only
- #command private Internal use only
-
#validate_document_size!(max_bson_size)
private
Internal use only
Validate that the documents in this message are all smaller than the maxBsonObjectSize.
::Mongo::Monitoring::Event::Secure
- Included
#compression_allowed? | Is compression allowed for a given command message. |
#redacted | Redact secure information from the document if: |
#sensitive? | Check whether the command is sensitive in terms of command monitoring spec. |
Message
- Inherited
#== | Tests for equality between two wire protocol messages by comparing class and field values. |
#eql? | Alias for Message#==. |
#hash | Creates a hash from the values of the fields of a message. |
#maybe_add_server_api, | |
#maybe_compress | Compress the message, if supported by the wire protocol used and if the command being sent permits compression. |
#maybe_decrypt | Possibly decrypt this message with libmongocrypt. |
#maybe_encrypt | Possibly encrypt this message with libmongocrypt. |
#maybe_inflate | Inflate a message if it is compressed. |
#number_returned | Default number returned value for protocol messages. |
#serialize | Serializes message into bytes that can be sent on the wire. |
#set_request_id | Generates a request id for a message. |
#to_s | Alias for Message#serialize. |
#compress_if_possible | Compress the message, if the command being sent permits compression. |
#fields | A method for getting the fields for a message class. |
#merge_sections, | |
#serialize_fields | Serializes message fields into a buffer. |
#serialize_header | Serializes the header of the message consisting of 4 32bit integers. |
Instance Attribute Details
#bulk_write? ⇒ Boolean
(readonly)
This method was written to support client-side encryption functionality. It is not recommended that this method be used in service of any other feature or behavior.
Whether this message represents a bulk write. A bulk write is an insert, update, or delete operation that encompasses multiple operations of the same type.
# File 'lib/mongo/protocol/msg.rb', line 272
def bulk_write? inserts = @main_document['documents'] updates = @main_document['updates'] deletes = @main_document['deletes'] num_inserts = inserts && inserts.length || 0 num_updates = updates && updates.length || 0 num_deletes = deletes && deletes.length || 0 num_inserts > 1 || num_updates > 1 || num_deletes > 1 end
#flags ⇒ Array
<Symbol> (rw)
#replyable? ⇒ true
, false
(readonly)
Whether the message expects a reply from the database.
# File 'lib/mongo/protocol/msg.rb', line 108
def replyable? @replyable ||= !flags.include?(:more_to_come) end
Instance Method Details
#add_check_sum(buffer) (private)
# File 'lib/mongo/protocol/msg.rb', line 354
def add_check_sum(buffer) if flags.include?(:checksum_present) #buffer.put_int32(checksum) end end
#command (private)
#documents
# File 'lib/mongo/protocol/msg.rb', line 196
def documents [@main_document] end
#fix_after_deserialization
Reverse-populates the instance variables after deserialization sets the @sections instance variable to the list of documents.
TODO fix deserialization so that this method is not needed.
# File 'lib/mongo/protocol/msg.rb', line 184
def fix_after_deserialization if @sections.nil? raise NotImplementedError, "After deserializations @sections should have been initialized" end if @sections.length != 1 raise NotImplementedError, "Deserialization must have produced exactly one section, but it produced #{sections.length} sections" end @main_document = @sections.first @sequences = [] @sections = [{type: 0, payload: @main_document}] end
#maybe_add_server_api(server_api)
# File 'lib/mongo/protocol/msg.rb', line 284
def maybe_add_server_api(server_api) conflicts = {} %i(apiVersion apiStrict apiDeprecationErrors).each do |key| if @main_document.key?(key) conflicts[key] = @main_document[key] end if @main_document.key?(key.to_s) conflicts[key] = @main_document[key.to_s] end end unless conflicts.empty? raise Error::ServerApiConflict, "The Client is configured with :server_api option but the operation provided the following conflicting parameters: #{conflicts.inspect}" end main_document = @main_document.merge( Utils.transform_server_api(server_api) ) Msg.new(@flags, @options, main_document, *@sequences) end
#maybe_compress(compressor, zlib_compression_level = nil) ⇒ Message
Compress the message, if the command being sent permits compression. Otherwise returns self.
# File 'lib/mongo/protocol/msg.rb', line 174
def maybe_compress(compressor, zlib_compression_level = nil) compress_if_possible(command.keys.first, compressor, zlib_compression_level) end
#maybe_decrypt(context) ⇒ Msg
Possibly decrypt this message with libmongocrypt. Message
will only be decrypted if the specified client exists, that client has been given auto-encryption options, and this message is eligible for decryption. A message is eligible for decryption if it represents one of the command types allow-listed by libmongocrypt and it contains data that is required to be encrypted by a local or remote json schema.
# File 'lib/mongo/protocol/msg.rb', line 251
def maybe_decrypt(context) if context.decrypt? cmd = merge_sections enc_cmd = context.decrypt(cmd) Msg.new(@flags, @options, enc_cmd) else self end end
#maybe_encrypt(connection, context) ⇒ Msg
Possibly encrypt this message with libmongocrypt. Message
will only be encrypted if the specified client exists, that client has been given auto-encryption options, the client has not been instructed to bypass auto-encryption, and mongocryptd determines that this message is eligible for encryption. A message is eligible for encryption if it represents one of the command types allow-listed by libmongocrypt and it contains data that is required to be encrypted by a local or remote json schema.
# File 'lib/mongo/protocol/msg.rb', line 214
def maybe_encrypt(connection, context) # TODO verify compression happens later, i.e. when this method runs # the message is not compressed. if context.encrypt? if connection.description.max_wire_version < 8 raise Error::CryptError.new( "Cannot perform encryption against a MongoDB server older than " + "4.2 (wire version less than 8). Currently connected to server " + "with max wire version #{connection.description.max_wire_version}} " + "(Auto-encryption requires a minimum MongoDB version of 4.2)" ) end db_name = @main_document[DATABASE_IDENTIFIER] cmd = merge_sections enc_cmd = context.encrypt(db_name, cmd) if cmd.key?('$db') && !enc_cmd.key?('$db') enc_cmd['$db'] = cmd['$db'] end Msg.new(@flags, @options, enc_cmd) else self end end
#number_returned ⇒ Integer
Returns the number of documents returned from the server.
The Msg instance must be for a server reply and the reply must return an active cursor (either a newly created one or one whose iteration is continuing via getMore).
# File 'lib/mongo/protocol/msg.rb', line 311
def number_returned if doc = documents.first if cursor = doc['cursor'] if batch = cursor['firstBatch'] || cursor['nextBatch'] return batch.length end end end raise NotImplementedError, "number_returned is only defined for cursor replies" end
#payload ⇒ BSON::Document
Return the event payload for monitoring.
# File 'lib/mongo/protocol/msg.rb', line 120
def payload # Reorder keys in main_document for better logging - see # https://jira.mongodb.org/browse/RUBY-1591. # Note that even without the reordering, the payload is not an exact # match to what is sent over the wire because the command as used in # the published event combines keys from multiple sections of the # payload sent over the wire. ordered_command = {} skipped_command = {} command.each do |k, v| if INTERNAL_KEYS.member?(k.to_s) skipped_command[k] = v else ordered_command[k] = v end end ordered_command.update(skipped_command) BSON::Document.new( command_name: ordered_command.keys.first.to_s, database_name: @main_document[DATABASE_IDENTIFIER], command: ordered_command, request_id: request_id, reply: @main_document, ) end
#sections ⇒ Array
<Hash
> | Array
<BSON::Document
>
The sections that will be serialized, or the documents have been deserialized.
Usually the sections contain OP_MSG-compliant sections derived from @main_document and @sequences. The information in @main_document and @sequences is duplicated in the sections.
When deserializing Msg
instances, sections temporarily is an array of documents returned in the type 0 section of the OP_MSG wire protocol message. #fix_after_deserialization method mutates this object to have sections, @main_document and @sequences be what they would have been had the Msg
instance been constructed using the constructor (rather than having been deserialized).
# File 'lib/mongo/protocol/msg.rb', line 420
field :sections, Sections
#serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil) ⇒ BSON::ByteBuffer
Serializes message into bytes that can be sent on the wire.
# File 'lib/mongo/protocol/msg.rb', line 155
def serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil) validate_document_size!(max_bson_size) super add_check_sum(buffer) buffer end
#validate_document_size!(max_bson_size) (private)
Validate that the documents in this message are all smaller than the maxBsonObjectSize. If not, raise an exception.
# File 'lib/mongo/protocol/msg.rb', line 326
def validate_document_size!(max_bson_size) max_bson_size ||= Mongo::Server::ConnectionBase::DEFAULT_MAX_BSON_OBJECT_SIZE contains_too_large_document = @sections.any? do |section| section[:type] == 1 && section[:payload][:sequence].any? do |document| document.to_bson.length > max_bson_size end end if contains_too_large_document raise Error::MaxBSONSize.new('The document exceeds maximum allowed BSON object size after serialization') end end