123456789_123456789_123456789_123456789_123456789_

Class: Mongo::Server::ConnectionBase

Relationships & Source Files
Extension / Inclusion / Inheritance Descendants
Subclasses:
Connection, Mongo::Server::PendingConnection
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
self, Forwardable, ConnectionCommon
Instance Chain:
Inherits: Mongo::Server::ConnectionCommon
Defined in: lib/mongo/server/connection_base.rb

Overview

Note:

Although methods of this module are part of the public API, the fact that these methods are defined on this module and not on the classes which include this module is not part of the public API.

This class encapsulates common connection functionality.

Since:

  • 2.0.0

Constant Summary

ConnectionCommon - Inherited

HELLO_DOC, LEGACY_HELLO_DOC

::Mongo::Loggable - Included

PREFIX

Instance Attribute Summary

::Mongo::Monitoring::Publishable - Included

ConnectionCommon - Inherited

#compressor

The compressor negotiated during the handshake for this connection, if any.

#connected?

Determine if the connection is currently connected.

#pid, #socket

Instance Method Summary

::Mongo::Monitoring::Publishable - Included

::Mongo::Loggable - Included

#log_debug

Convenience method to log debug messages with the standard prefix.

#log_error

Convenience method to log error messages with the standard prefix.

#log_fatal

Convenience method to log fatal messages with the standard prefix.

#log_info

Convenience method to log info messages with the standard prefix.

#log_warn

Convenience method to log warn messages with the standard prefix.

#logger

Get the logger instance.

#_mongo_log_prefix, #format_message

ConnectionCommon - Inherited

#handshake_command

Build a command that should be used for connection handshake.

#handshake_document

Build a document that should be used for connection handshake.

#add_server_diagnostics

Yields to the block and, if the block raises an exception, adds a note to the exception with the address of the specified server.

#ensure_connected, #set_compressor!, #ssl_options

Instance Attribute Details

#descriptionServer::Description (readonly)

This method is for internal use only.
Note:

A connection object that hasn’t yet connected (handshaken and authenticated, if authentication is required) does not have a description. While handshaking and authenticating the driver must be using global defaults, in particular not assuming that the properties of a particular connection are the same as properties of other connections made to the same address (since the server on the other end could have been shut down and a different server version could have been launched).

Returns the server description for this connection, derived from the hello response for the handshake performed on this connection.

Returns:

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server/connection_base.rb', line 82

attr_reader :description

#optionsHash (readonly)

Returns:

  • (Hash)

    options The passed in options.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server/connection_base.rb', line 53

attr_reader :options

#serverMongo::Address (readonly)

This method is for internal use only.

Returns:

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server/connection_base.rb', line 58

attr_reader :server

Instance Method Details

#app_metadata

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server/connection_base.rb', line 107

def 
  @app_metadata ||= begin
    same = true
    AppMetadata::AUTH_OPTION_KEYS.each do |key|
      if @server.options[key] != options[key]
        same = false
        break
      end
    end
    if same
      @server.
    else
      AppMetadata.new(options.merge(purpose: @server..purpose))
    end
  end
end

#check_timeout!(context) (private)

If timeoutMS is set for the operation context, checks whether there is enough time left to send the corresponding message to the server (remaining timeout is bigger than minimum round trip time for the server)

Parameters:

Raises:

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server/connection_base.rb', line 288

def check_timeout!(context)
  return if [nil, 0].include?(context.deadline)

  time_to_execute = context.remaining_timeout_sec - server.minimum_round_trip_time
  if time_to_execute <= 0
    raise Mongo::Error::TimeoutError
  end
end

#deliver(message, context, options = {}) (private)

Raises:

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server/connection_base.rb', line 167

def deliver(message, context, options = {})
  if Lint.enabled? && !@socket
    raise Error::LintError, "Trying to deliver a message over a disconnected connection (to #{address})"
  end
  buffer = serialize(message, context)
  check_timeout!(context)
  ensure_connected do |socket|
    operation_id = Monitoring.next_operation_id
    started_event = command_started(address, operation_id, message.payload,
      socket_object_id: socket.object_id, connection_id: id,
      connection_generation: generation,
      server_connection_id: description.server_connection_id,
      service_id: description.service_id,
    )
    start = Utils.monotonic_time
    result = nil
    begin
      result = add_server_diagnostics do
        socket.write(buffer.to_s, timeout: context.remaining_timeout_sec)
        if message.replyable?
          check_timeout!(context)
          Protocol::Message.deserialize(socket, max_message_size, message.request_id, options.merge(timeout: context.remaining_timeout_sec))
        else
          nil
        end
      end
    rescue Exception => e
      total_duration = Utils.monotonic_time - start
      command_failed(nil, address, operation_id, message.payload,
        e.message, total_duration,
        started_event: started_event,
        server_connection_id: description.server_connection_id,
        service_id: description.service_id,
      )
      raise
    else
      total_duration = Utils.monotonic_time - start
      command_completed(result, address, operation_id, message.payload,
        total_duration,
        started_event: started_event,
        server_connection_id: description.server_connection_id,
        service_id: description.service_id,
      )
    end
    if result && context.decrypt?
      result = result.maybe_decrypt(context)
    end
    result
  end
end

#dispatch(messages, context, options = {}) ⇒ Protocol::Message | nil

Note:

This method is named dispatch since ‘send’ is a core Ruby method on all objects.

Note:

For backwards compatibility, this method accepts the messages as an array. However, exactly one message must be given per invocation.

Dispatch a single message to the connection. If the message requires a response, a reply will be returned.

Examples:

Dispatch the message.

connection.dispatch([ insert ])

Parameters:

  • messages (Array<Message>)

    A one-element array containing the message to dispatch.

  • context (Operation::Context)

    The operation context.

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :deserialize_as_bson (Boolean)

    Whether to deserialize the response to this message using BSON objects in place of native Ruby types wherever possible.

Returns:

Raises:

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server/connection_base.rb', line 150

def dispatch(messages, context, options = {})
  # The monitoring code does not correctly handle multiple messages,
  # and the driver internally does not send more than one message at
  # a time ever. Thus prohibit multiple message use for now.
  if messages.length != 1
    raise ArgumentError, 'Can only dispatch one message at a time'
  end
  if description.unknown?
    raise Error::InternalDriverError, "Cannot dispatch a message on a connection with unknown description: #{description.inspect}"
  end
  message = messages.first
  deliver(message, context, options)
end

#generationInteger | nil

Connection pool generation from which this connection was created. May be nil.

Returns:

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server/connection_base.rb', line 100

def generation
  # If the connection is to a load balancer, @generation is set
  # after handshake completes. If the connection is to another server
  # type, generation is specified during connection creation.
  @generation || options[:generation]
end

#serialize(message, context, buffer = BSON::ByteBuffer.new) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server/connection_base.rb', line 218

def serialize(message, context, buffer = BSON::ByteBuffer.new)
  # Driver specifications only mandate the fixed 16MiB limit for
  # serialized BSON documents. However, the server returns its
  # active serialized BSON document size limit in the hello response,
  # which is +max_bson_object_size+ below. The +DEFAULT_MAX_BSON_OBJECT_SIZE+
  # is the 16MiB value mandated by the specifications which we use
  # only as the default if the server's hello did not contain
  # maxBsonObjectSize.
  max_bson_size = max_bson_object_size || DEFAULT_MAX_BSON_OBJECT_SIZE
  if context.encrypt?
    # The client-side encryption specification requires bulk writes to
    # be split at a reduced maxBsonObjectSize. If this message is a bulk
    # write and its size exceeds the reduced size limit, the serializer
    # will raise an exception, which is caught by BulkWrite. BulkWrite
    # will split the operation into individual writes, which will
    # not be subject to the reduced maxBsonObjectSize.
    if message.bulk_write?
      # Make the new maximum size equal to the specified reduced size
      # limit plus the 16KiB overhead allowance.
      max_bson_size = REDUCED_MAX_BSON_SIZE
    end
  end

  # RUBY-2234: It is necessary to check that the message size does not
  # exceed the maximum bson object size before compressing and serializing
  # the final message.
  #
  # This is to avoid the case where the user performs a bulk write
  # larger than 16MiB which, when compressed, becomes smaller than 16MiB.
  # If the driver does not split the bulk writes prior to compression,
  # the entire operation will be sent to the server, which will raise an
  # error because the uncompressed operation exceeds the maximum bson size.
  #
  # To address this problem, we serialize the message prior to compression
  # and raise an exception if the serialized message exceeds the maximum
  # bson size.
  if max_message_size
    # Create a separate buffer that contains the un-compressed message
    # for the purpose of checking its size. Write any pre-existing contents
    # from the original buffer into the temporary one.
    temp_buffer = BSON::ByteBuffer.new

    # TODO: address the fact that this line mutates the buffer.
    temp_buffer.put_bytes(buffer.get_bytes(buffer.length))

    message.serialize(temp_buffer, max_bson_size, MAX_BSON_COMMAND_OVERHEAD)
    if temp_buffer.length > max_message_size
      raise Error::MaxMessageSize.new(max_message_size)
    end
  end

  # RUBY-2335: When the un-compressed message is smaller than the maximum
  # bson size limit, the message will be serialized twice. The operations
  # layer should be refactored to allow compression on an already-
  # serialized message.
  final_message = message.maybe_compress(compressor, options[:zlib_compression_level])
  final_message.serialize(buffer, max_bson_size, MAX_BSON_COMMAND_OVERHEAD)

  buffer
end

#service_idnil | Object

Returns:

  • (nil | Object)

    The service id, if any.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server/connection_base.rb', line 92

def service_id
  description&.service_id
end