123456789_123456789_123456789_123456789_123456789_

Class: Mongo::Protocol::Message Abstract

Relationships & Source Files
Extension / Inclusion / Inheritance Descendants
Subclasses:
Compressed, GetMore, KillCursors, Mongo::Protocol::Msg, Query, Reply
Super Chains via Extension / Inclusion / Inheritance
Instance Chain:
Inherits: Object
Defined in: lib/mongo/protocol/message.rb

Overview

This class is abstract.

A base class providing functionality required by all messages in the MongoDB wire protocol. It provides a minimal DSL for defining typed fields to enable serialization and deserialization over the wire.

Examples:

class WireProtocolMessage < Message

  private

  def op_code
    1234
  end

  FLAGS = [:first_bit, :bit_two]

  # payload
  field :flags, BitVector.new(FLAGS)
  field :namespace, CString
  field :document, Document
  field :documents, Document, true
end

Constant Summary

Serializers - Included

HEADER_PACK, INT32_PACK, INT64_PACK, NULL, ZERO

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Constructor Details

.new(*args) ⇒ Message

:nodoc:

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 79

def initialize(*args) # :nodoc:
  set_request_id
end

Class Method Details

.deserialize(io, max_message_size = MAX_MESSAGE_SIZE, expected_response_to = nil, options = {}) ⇒ Message

This method is for internal use only.

Deserializes messages from an IO stream.

This method returns decompressed messages (i.e. if the message on the wire was OP_COMPRESSED, this method would typically return the OP_MSG message that is the result of decompression).

Parameters:

  • max_message_size (Integer) (defaults to: MAX_MESSAGE_SIZE)

    The max message size.

  • io (IO)

    Stream containing a message

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :deserialize_as_bson (Boolean)

    Whether to deserialize this message using BSON types instead of native Ruby types wherever possible.

  • :socket_timeout (Numeric)

    The timeout to use for each read operation.

Returns:

  • (Message)

    Instance of a Message class

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 238

def self.deserialize(io,
  max_message_size = MAX_MESSAGE_SIZE,
  expected_response_to = nil,
  options = {}
)
  # io is usually a Mongo::Socket instance, which supports the
  # timeout option. For compatibility with whoever might call this
  # method with some other IO-like object, pass options only when they
  # are not empty.
  read_options = {}
  if timeout = options[:socket_timeout]
    read_options[:timeout] = timeout
  end

  if read_options.empty?
    chunk = io.read(16)
  else
    chunk = io.read(16, **read_options)
  end
  buf = BSON::ByteBuffer.new(chunk)
  length, _request_id, response_to, _op_code = deserialize_header(buf)

  # Protection from potential DOS man-in-the-middle attacks. See
  # DRIVERS-276.
  if length > (max_message_size || MAX_MESSAGE_SIZE)
    raise Error::MaxMessageSize.new(max_message_size)
  end

  # Protection against returning the response to a previous request. See
  # RUBY-1117
  if expected_response_to && response_to != expected_response_to
    raise Error::UnexpectedResponse.new(expected_response_to, response_to)
  end

  if read_options.empty?
    chunk = io.read(length - 16)
  else
    chunk = io.read(length - 16, **read_options)
  end
  buf = BSON::ByteBuffer.new(chunk)

  message = Registry.get(_op_code).allocate
  message.send(:fields).each do |field|
    if field[:multi]
      deserialize_array(message, buf, field, options)
    else
      deserialize_field(message, buf, field, options)
    end
  end
  if message.is_a?(Msg)
    message.fix_after_deserialization
  end
  message.maybe_inflate
end

.deserialize_array(message, io, field, options = {}) ⇒ Message (private)

Deserializes an array of fields in a message

The number of items in the array must be described by a previously deserialized field specified in the class by the field dsl under the key :multi

Parameters:

  • message (Message)

    Message to contain the deserialized array.

  • io (IO)

    Stream containing the array to deserialize.

  • field (Hash)

    Hash representing a field.

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :deserialize_as_bson (Boolean)

    Whether to deserialize each of the elements in this array using BSON types wherever possible.

Returns:

  • (Message)

    Message with deserialized array.

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 435

def self.deserialize_array(message, io, field, options = {})
  elements = []
  count = message.instance_variable_get(field[:multi])
  count.times { elements << field[:type].deserialize(io, options) }
  message.instance_variable_set(field[:name], elements)
end

.deserialize_field(message, io, field, options = {}) ⇒ Message (private)

Deserializes a single field in a message

Parameters:

  • message (Message)

    Message to contain the deserialized field.

  • io (IO)

    Stream containing the field to deserialize.

  • field (Hash)

    Hash representing a field.

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :deserialize_as_bson (Boolean)

    Whether to deserialize this field using BSON types wherever possible.

Returns:

  • (Message)

    Message with deserialized field.

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 453

def self.deserialize_field(message, io, field, options = {})
  message.instance_variable_set(
    field[:name],
    field[:type].deserialize(io, options)
  )
end

.deserialize_header(io) ⇒ Array<Fixnum> (private)

Deserializes the header of the message

Parameters:

  • io (IO)

    Stream containing the header.

Returns:

  • (Array<Fixnum>)

    Deserialized header.

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 391

def self.deserialize_header(io)
  Header.deserialize(io)
end

.field(name, type, multi = false) ⇒ NilClass (private)

A method for declaring a message field

Parameters:

  • name (String)

    Name of the field

  • type (Module)

    Type specific serialization strategies

  • multi (true, false, Symbol) (defaults to: false)

    Specify as true to serialize the field’s value as an array of type :type or as a symbol describing the field having the number of items in the array (used upon deserialization)

    Note: In fields where multi is a symbol representing the field
    containing number items in the repetition, the field containing
    that information *must* be deserialized prior to deserializing
    fields that use the number.
[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 410

def self.field(name, type, multi = false)
  fields << {
    :name => "@#{name}".intern,
    :type => type,
    :multi => multi
  }

  attr_reader name
end

.fieldsInteger (private)

A class method for getting the fields for a message class

Returns:

  • (Integer)

    the fields for the message class

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 343

def self.fields
  @fields ||= []
end

Instance Attribute Details

#replyable?false (readonly)

The default for messages is not to require a reply after sending a message to the server.

Examples:

Does the message require a reply?

message.replyable?

Returns:

  • (false)

    The default is to not require a reply.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 97

def replyable?
  false
end

#request_idFixnum (readonly)

Returns the request id for the message

Returns:

  • (Fixnum)

    The request id for this message

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 86

attr_reader :request_id

Instance Method Details

#==(other) ⇒ true, false Also known as: #eql?

Tests for equality between two wire protocol messages by comparing class and field values.

Parameters:

  • other (Message)

    The wire protocol message.

Returns:

  • (true, false)

    The equality of the messages.

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 298

def ==(other)
  return false if self.class != other.class
  fields.all? do |field|
    name = field[:name]
    instance_variable_get(name) ==
      other.instance_variable_get(name)
  end
end

#compress_if_possible(command_name, compressor, zlib_compression_level) ⇒ Message (private)

Compress the message, if the command being sent permits compression. Otherwise returns self.

Parameters:

  • command_name (String)

    Command name extracted from the message.

  • compressor (String | Symbol)

    The compressor to use.

  • zlib_compression_level (Integer)

    Zlib compression level to use.

Returns:

  • (Message)

    A Protocol::Compressed message or self, depending on whether this message can be compressed.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 127

private def compress_if_possible(command_name, compressor, zlib_compression_level)
  if compressor && compression_allowed?(command_name)
    Compressed.new(self, compressor, zlib_compression_level)
  else
    self
  end
end

#eql?(other)

Alias for #==.

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 306

alias_method :eql?, :==

#fieldsInteger (private)

A method for getting the fields for a message class

Returns:

  • (Integer)

    the fields for the message class

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 336

def fields
  self.class.fields
end

#hashFixnum

Creates a hash from the values of the fields of a message.

Returns:

  • (Fixnum)

    The hash code for the message.

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 311

def hash
  fields.map { |field| instance_variable_get(field[:name]) }.hash
end

#maybe_add_server_api(server_api)

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 173

def maybe_add_server_api(server_api)
  raise Error::ServerApiNotSupported, "Server API parameters cannot be sent to pre-3.6 MongoDB servers. Please remove the :server_api parameter from Client options or use MongoDB 3.6 or newer"
end

#maybe_compress(compressor, zlib_compression_level = nil) ⇒ self

This method is for internal use only.

Compress the message, if supported by the wire protocol used and if the command being sent permits compression. Otherwise returns self.

Parameters:

  • compressor (String, Symbol)

    The compressor to use.

  • zlib_compression_level (Integer) (defaults to: nil)

    The zlib compression level to use.

Returns:

  • (self)

    Always returns self. Other message types should override this method.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 112

def maybe_compress(compressor, zlib_compression_level = nil)
  self
end

#maybe_decrypt(context) ⇒ Mongo::Protocol::Msg

Possibly decrypt this message with libmongocrypt.

Parameters:

Returns:

  • (Mongo::Protocol::Msg)

    The decrypted message, or the original message if decryption was not possible or necessary.

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 152

def maybe_decrypt(context)
  # TODO determine if we should be decrypting data coming from pre-4.2
  # servers, potentially using legacy wire protocols. If so we need
  # to implement decryption for those wire protocols as our current
  # encryption/decryption code is OP_MSG-specific.
  self
end

#maybe_encrypt(connection, context) ⇒ Mongo::Protocol::Msg

Possibly encrypt this message with libmongocrypt.

Parameters:

Returns:

  • (Mongo::Protocol::Msg)

    The encrypted message, or the original message if encryption was not possible or necessary.

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 168

def maybe_encrypt(connection, context)
  # Do nothing if the Message subclass has not implemented this method
  self
end

#maybe_inflateProtocol::Message

This method is for internal use only.

Inflate a message if it is compressed.

Returns:

  • (Protocol::Message)

    Always returns self. Subclasses should override this method as necessary.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 142

def maybe_inflate
  self
end

#merge_sections (private)

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 177

private def merge_sections
  cmd = if @sections.length > 1
    cmd = @sections.detect { |section| section[:type] == 0 }[:payload]
    identifier = @sections.detect { |section| section[:type] == 1}[:payload][:identifier]
    cmd.merge(identifier.to_sym =>
      @sections.select { |section| section[:type] == 1 }.
        map { |section| section[:payload][:sequence] }.
        inject([]) { |arr, documents| arr + documents }
    )
  elsif @sections.first[:payload]
    @sections.first[:payload]
  else
    @sections.first
  end
  if cmd.nil?
    raise "The command should never be nil here"
  end
  cmd
end

#number_returned0

Default number returned value for protocol messages.

Returns:

  • (0)

    This method must be overridden, otherwise, always returns 0.

Since:

  • 2.5.0

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 329

def number_returned; 0; end

#serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil) ⇒ String Also known as: #to_s

Serializes message into bytes that can be sent on the wire

Parameters:

  • buffer (String) (defaults to: BSON::ByteBuffer.new)

    buffer where the message should be inserted

Returns:

  • (String)

    buffer containing the serialized message

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 201

def serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil)
  max_size =
    if max_bson_size && bson_overhead
      max_bson_size + bson_overhead
    elsif max_bson_size
      max_bson_size
    else
      nil
    end

  start = buffer.length
  serialize_header(buffer)
  serialize_fields(buffer, max_size)
  buffer.replace_int32(start, buffer.length - start)
end

#serialize_fields(buffer, max_bson_size = nil) ⇒ String (private)

Serializes message fields into a buffer

Parameters:

  • buffer (String)

    buffer to receive the field

Returns:

  • (String)

    buffer with serialized field

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 351

def serialize_fields(buffer, max_bson_size = nil)
  fields.each do |field|
    value = instance_variable_get(field[:name])
    if field[:multi]
      value.each do |item|
        if field[:type].respond_to?(:size_limited?)
          field[:type].serialize(buffer, item, max_bson_size)
        else
          field[:type].serialize(buffer, item)
        end
      end
    else
      if field[:type].respond_to?(:size_limited?)
        field[:type].serialize(buffer, value, max_bson_size)
      else
        field[:type].serialize(buffer, value)
      end
    end
  end
end

#serialize_header(buffer) ⇒ String (private)

Serializes the header of the message consisting of 4 32bit integers

The integers represent a message length placeholder (calculation of the actual length is deferred) the request id, the response to id, and the op code for the message

Currently uses hardcoded 0 for request id and response to as their values are irrelevent to the server

Parameters:

  • buffer (String)

    Buffer to receive the header

Returns:

  • (String)

    Serialized header

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 383

def serialize_header(buffer)
  Header.serialize(buffer, [0, request_id, 0, op_code])
end

#set_request_idFixnum

Generates a request id for a message

Returns:

  • (Fixnum)

    a request id used for sending a message to the server. The server will put this id in the response_to field of a reply.

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 320

def set_request_id
  @request_id = self.class.next_id
end

#to_s(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil)

Alias for #serialize.

[ GitHub ]

  
# File 'lib/mongo/protocol/message.rb', line 217

alias_method :to_s, :serialize