Class: Mongo::Protocol::Message Abstract
Relationships & Source Files | |
Extension / Inclusion / Inheritance Descendants | |
Subclasses:
|
|
Super Chains via Extension / Inclusion / Inheritance | |
Instance Chain:
self,
Serializers ,
::Mongo::Id
|
|
Inherits: | Object |
Defined in: | lib/mongo/protocol/message.rb |
Overview
A base class providing functionality required by all messages in the MongoDB wire protocol. It provides a minimal DSL for defining typed fields to enable serialization and deserialization over the wire.
Constant Summary
-
BATCH_SIZE =
The batch size constant.
'batchSize'.freeze
-
COLLECTION =
The collection constant.
'collection'.freeze
-
LIMIT =
The limit constant.
'limit'.freeze
-
MAX_MESSAGE_SIZE =
Default max message size of 48MB.
50331648.freeze
-
ORDERED =
The ordered constant.
'ordered'.freeze
-
Q =
The q constant.
'q'.freeze
Serializers
- Included
Class Method Summary
-
.deserialize(io, max_message_size = MAX_MESSAGE_SIZE, expected_response_to = nil, options = {}) ⇒ Message
Internal use only
Internal use only
Deserializes messages from an IO stream.
-
.new(*args) ⇒ Message
constructor
:nodoc:
-
.deserialize_array(message, io, field, options = {}) ⇒ Message
private
Deserializes an array of fields in a message.
-
.deserialize_field(message, io, field, options = {}) ⇒ Message
private
Deserializes a single field in a message.
-
.deserialize_header(io) ⇒ Array<Fixnum>
private
Deserializes the header of the message.
-
.field(name, type, multi = false) ⇒ NilClass
private
A method for declaring a message field.
-
.fields ⇒ Integer
private
A class method for getting the fields for a message class.
Instance Attribute Summary
-
#replyable? ⇒ false
readonly
The default for messages is not to require a reply after sending a message to the server.
-
#request_id ⇒ Fixnum
readonly
Returns the request id for the message.
Instance Method Summary
-
#==(other) ⇒ true, false
(also: #eql?)
Tests for equality between two wire protocol messages by comparing class and field values.
-
#eql?(other)
Alias for #==.
-
#hash ⇒ Fixnum
Creates a hash from the values of the fields of a message.
- #maybe_add_server_api(server_api)
-
#maybe_compress(compressor, zlib_compression_level = nil) ⇒ self
Internal use only
Internal use only
Compress the message, if supported by the wire protocol used and if the command being sent permits compression.
-
#maybe_decrypt(context) ⇒ Mongo::Protocol::Msg
Possibly decrypt this message with libmongocrypt.
-
#maybe_encrypt(connection, context) ⇒ Mongo::Protocol::Msg
Possibly encrypt this message with libmongocrypt.
-
#maybe_inflate ⇒ Protocol::Message
Internal use only
Internal use only
Inflate a message if it is compressed.
-
#number_returned ⇒ 0
Default number returned value for protocol messages.
-
#serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil) ⇒ String
(also: #to_s)
Serializes message into bytes that can be sent on the wire.
-
#set_request_id ⇒ Fixnum
Generates a request id for a message.
-
#to_s(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil)
Alias for #serialize.
-
#compress_if_possible(command_name, compressor, zlib_compression_level) ⇒ Message
private
Compress the message, if the command being sent permits compression.
-
#fields ⇒ Integer
private
A method for getting the fields for a message class.
- #merge_sections private
-
#serialize_fields(buffer, max_bson_size = nil) ⇒ String
private
Serializes message fields into a buffer.
-
#serialize_header(buffer) ⇒ String
private
Serializes the header of the message consisting of 4 32bit integers.
Constructor Details
.new(*args) ⇒ Message
:nodoc:
# File 'lib/mongo/protocol/message.rb', line 79
def initialize(*args) # :nodoc: set_request_id end
Class Method Details
.deserialize(io, max_message_size = MAX_MESSAGE_SIZE, expected_response_to = nil, options = {}) ⇒ Message
Deserializes messages from an IO stream.
This method returns decompressed messages (i.e. if the message on the wire was OP_COMPRESSED, this method would typically return the OP_MSG message that is the result of decompression).
# File 'lib/mongo/protocol/message.rb', line 238
def self.deserialize(io, = MAX_MESSAGE_SIZE, expected_response_to = nil, = {} ) # io is usually a Mongo::Socket instance, which supports the # timeout option. For compatibility with whoever might call this # method with some other IO-like object, pass options only when they # are not empty. = .slice(:timeout, :socket_timeout) if .empty? chunk = io.read(16) else chunk = io.read(16, ** ) end buf = BSON::ByteBuffer.new(chunk) length, _request_id, response_to, _op_code = deserialize_header(buf) # Protection from potential DOS man-in-the-middle attacks. See # DRIVERS-276. if length > ( || MAX_MESSAGE_SIZE) raise Error::MaxMessageSize.new( ) end # Protection against returning the response to a previous request. See # RUBY-1117 if expected_response_to && response_to != expected_response_to raise Error::UnexpectedResponse.new(expected_response_to, response_to) end if .empty? chunk = io.read(length - 16) else chunk = io.read(length - 16, ** ) end buf = BSON::ByteBuffer.new(chunk) = Registry.get(_op_code).allocate .send(:fields).each do |field| if field[:multi] deserialize_array(, buf, field, ) else deserialize_field(, buf, field, ) end end if .is_a?(Msg) .fix_after_deserialization end .maybe_inflate end
.deserialize_array(message, io, field, options = {}) ⇒ Message
(private)
Deserializes an array of fields in a message
The number of items in the array must be described by a previously deserialized field specified in the class by the field dsl under the key :multi
# File 'lib/mongo/protocol/message.rb', line 432
def self.deserialize_array(, io, field, = {}) elements = [] count = .instance_variable_get(field[:multi]) count.times { elements << field[:type].deserialize(io, ) } .instance_variable_set(field[:name], elements) end
.deserialize_field(message, io, field, options = {}) ⇒ Message
(private)
Deserializes a single field in a message
# File 'lib/mongo/protocol/message.rb', line 450
def self.deserialize_field(, io, field, = {}) .instance_variable_set( field[:name], field[:type].deserialize(io, ) ) end
.deserialize_header(io) ⇒ Array
<Fixnum
> (private)
Deserializes the header of the message
# File 'lib/mongo/protocol/message.rb', line 388
def self.deserialize_header(io) Header.deserialize(io) end
.field(name, type, multi = false) ⇒ NilClass
(private)
A method for declaring a message field
# File 'lib/mongo/protocol/message.rb', line 407
def self.field(name, type, multi = false) fields << { :name => "@#{name}".intern, :type => type, :multi => multi } attr_reader name end
.fields ⇒ Integer
(private)
A class method for getting the fields for a message class
# File 'lib/mongo/protocol/message.rb', line 340
def self.fields @fields ||= [] end
Instance Attribute Details
#replyable? ⇒ false
(readonly)
The default for messages is not to require a reply after sending a message to the server.
# File 'lib/mongo/protocol/message.rb', line 97
def replyable? false end
#request_id ⇒ Fixnum
(readonly)
Returns the request id for the message
# File 'lib/mongo/protocol/message.rb', line 86
attr_reader :request_id
Instance Method Details
#==(other) ⇒ true
, false
Also known as: #eql?
Tests for equality between two wire protocol messages by comparing class and field values.
#compress_if_possible(command_name, compressor, zlib_compression_level) ⇒ Message
(private)
Compress the message, if the command being sent permits compression. Otherwise returns self.
# File 'lib/mongo/protocol/message.rb', line 127
private def compress_if_possible(command_name, compressor, zlib_compression_level) if compressor && compression_allowed?(command_name) Compressed.new(self, compressor, zlib_compression_level) else self end end
#eql?(other)
Alias for #==.
# File 'lib/mongo/protocol/message.rb', line 303
alias_method :eql?, :==
#fields ⇒ Integer
(private)
A method for getting the fields for a message class
# File 'lib/mongo/protocol/message.rb', line 333
def fields self.class.fields end
#hash ⇒ Fixnum
Creates a hash from the values of the fields of a message.
#maybe_add_server_api(server_api)
# File 'lib/mongo/protocol/message.rb', line 173
def maybe_add_server_api(server_api) raise Error::ServerApiNotSupported, "Server API parameters cannot be sent to pre-3.6 MongoDB servers. Please remove the :server_api parameter from Client options or use MongoDB 3.6 or newer" end
#maybe_compress(compressor, zlib_compression_level = nil) ⇒ self
Compress the message, if supported by the wire protocol used and if the command being sent permits compression. Otherwise returns self.
# File 'lib/mongo/protocol/message.rb', line 112
def maybe_compress(compressor, zlib_compression_level = nil) self end
#maybe_decrypt(context) ⇒ Mongo::Protocol::Msg
Possibly decrypt this message with libmongocrypt.
# File 'lib/mongo/protocol/message.rb', line 152
def maybe_decrypt(context) # TODO determine if we should be decrypting data coming from pre-4.2 # servers, potentially using legacy wire protocols. If so we need # to implement decryption for those wire protocols as our current # encryption/decryption code is OP_MSG-specific. self end
#maybe_encrypt(connection, context) ⇒ Mongo::Protocol::Msg
Possibly encrypt this message with libmongocrypt.
# File 'lib/mongo/protocol/message.rb', line 168
def maybe_encrypt(connection, context) # Do nothing if the Message subclass has not implemented this method self end
#maybe_inflate ⇒ Protocol::Message
Inflate a message if it is compressed.
# File 'lib/mongo/protocol/message.rb', line 142
def maybe_inflate self end
#merge_sections (private)
[ GitHub ]# File 'lib/mongo/protocol/message.rb', line 177
private def merge_sections cmd = if @sections.length > 1 cmd = @sections.detect { |section| section[:type] == 0 }[:payload] identifier = @sections.detect { |section| section[:type] == 1}[:payload][:identifier] cmd.merge(identifier.to_sym => @sections.select { |section| section[:type] == 1 }. map { |section| section[:payload][:sequence] }. inject([]) { |arr, documents| arr + documents } ) elsif @sections.first[:payload] @sections.first[:payload] else @sections.first end if cmd.nil? raise "The command should never be nil here" end cmd end
#number_returned ⇒ 0
Default number returned value for protocol messages.
# File 'lib/mongo/protocol/message.rb', line 326
def number_returned; 0; end
#serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil) ⇒ String
Also known as: #to_s
Serializes message into bytes that can be sent on the wire
# File 'lib/mongo/protocol/message.rb', line 201
def serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil) max_size = if max_bson_size && bson_overhead max_bson_size + bson_overhead elsif max_bson_size max_bson_size else nil end start = buffer.length serialize_header(buffer) serialize_fields(buffer, max_size) buffer.replace_int32(start, buffer.length - start) end
#serialize_fields(buffer, max_bson_size = nil) ⇒ String
(private)
Serializes message fields into a buffer
# File 'lib/mongo/protocol/message.rb', line 348
def serialize_fields(buffer, max_bson_size = nil) fields.each do |field| value = instance_variable_get(field[:name]) if field[:multi] value.each do |item| if field[:type].respond_to?(:size_limited?) field[:type].serialize(buffer, item, max_bson_size) else field[:type].serialize(buffer, item) end end else if field[:type].respond_to?(:size_limited?) field[:type].serialize(buffer, value, max_bson_size) else field[:type].serialize(buffer, value) end end end end
#serialize_header(buffer) ⇒ String
(private)
Serializes the header of the message consisting of 4 32bit integers
The integers represent a message length placeholder (calculation of the actual length is deferred) the request id, the response to id, and the op code for the message
Currently uses hardcoded 0 for request id and response to as their values are irrelevent to the server
# File 'lib/mongo/protocol/message.rb', line 380
def serialize_header(buffer) Header.serialize(buffer, [0, request_id, 0, op_code]) end
#set_request_id ⇒ Fixnum
Generates a request id for a message
# File 'lib/mongo/protocol/message.rb', line 317
def set_request_id @request_id = self.class.next_id end
#to_s(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil)
Alias for #serialize.
# File 'lib/mongo/protocol/message.rb', line 217
alias_method :to_s, :serialize