Class: Mongo::Server::ConnectionBase
Relationships & Source Files | |
Extension / Inclusion / Inheritance Descendants | |
Subclasses:
Connection, Mongo::Server::PendingConnection
|
|
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
self,
Forwardable,
ConnectionCommon
|
|
Instance Chain:
|
|
Inherits: |
Mongo::Server::ConnectionCommon
|
Defined in: | lib/mongo/server/connection_base.rb |
Overview
Although methods of this module are part of the public API, the fact that these methods are defined on this module and not on the classes which include this module is not part of the public API.
This class encapsulates common connection functionality.
Constant Summary
-
DEFAULT_MAX_BSON_OBJECT_SIZE =
Internal use only
The maximum allowed size in bytes that a user-supplied document may take up when serialized, if the server’s hello response does not include maxBsonObjectSize field.
The commands that are sent to the server may exceed this size by MAX_BSON_COMMAND_OVERHEAD.
16777216
-
MAX_BSON_COMMAND_OVERHEAD =
Internal use only
The additional overhead allowed for command data (i.e. fields added to the command document by the driver, as opposed to documents provided by the user) when serializing a complete command to BSON.
16384
-
REDUCED_MAX_BSON_SIZE =
Internal use only
2097152
ConnectionCommon
- Inherited
::Mongo::Loggable
- Included
Instance Attribute Summary
-
#description ⇒ Server::Description
readonly
Internal use only
Internal use only
Returns the server description for this connection, derived from the hello response for the handshake performed on this connection.
- #options ⇒ Hash readonly
- #server ⇒ Mongo::Address readonly Internal use only
::Mongo::Monitoring::Publishable
- Included
ConnectionCommon
- Inherited
#compressor | The compressor negotiated during the handshake for this connection, if any. |
#connected? | Determine if the connection is currently connected. |
#pid, #socket |
Instance Method Summary
- #app_metadata
-
#dispatch(messages, context, options = {}) ⇒ Protocol::Message | nil
Dispatch a single message to the connection.
-
#generation ⇒ Integer | nil
Connection
pool generation from which this connection was created. - #service_id ⇒ nil | Object
-
#check_timeout!(context)
private
If timeoutMS is set for the operation context, checks whether there is enough time left to send the corresponding message to the server (remaining timeout is bigger than minimum round trip time for the server).
- #deliver(message, context, options = {}) private
- #serialize(message, context, buffer = BSON::ByteBuffer.new) private
::Mongo::Monitoring::Publishable
- Included
#publish_cmap_event, #publish_event, #publish_sdam_event, #command_completed, #command_failed, #command_started, #command_succeeded, #duration |
::Mongo::Loggable
- Included
#log_debug | Convenience method to log debug messages with the standard prefix. |
#log_error | Convenience method to log error messages with the standard prefix. |
#log_fatal | Convenience method to log fatal messages with the standard prefix. |
#log_info | Convenience method to log info messages with the standard prefix. |
#log_warn | Convenience method to log warn messages with the standard prefix. |
#logger | Get the logger instance. |
#_mongo_log_prefix, #format_message |
ConnectionCommon
- Inherited
#handshake_command | Build a command that should be used for connection handshake. |
#handshake_document | Build a document that should be used for connection handshake. |
#add_server_diagnostics | Yields to the block and, if the block raises an exception, adds a note to the exception with the address of the specified server. |
#ensure_connected, #set_compressor!, #ssl_options |
Instance Attribute Details
#description ⇒ Server::Description (readonly)
A connection object that hasn’t yet connected (handshaken and authenticated, if authentication is required) does not have a description. While handshaking and authenticating the driver must be using global defaults, in particular not assuming that the properties of a particular connection are the same as properties of other connections made to the same address (since the server on the other end could have been shut down and a different server version could have been launched).
Returns the server description for this connection, derived from the hello response for the handshake performed on this connection.
# File 'lib/mongo/server/connection_base.rb', line 82
attr_reader :description
#options ⇒ Hash
(readonly)
# File 'lib/mongo/server/connection_base.rb', line 53
attr_reader :
#server ⇒ Mongo::Address (readonly)
# File 'lib/mongo/server/connection_base.rb', line 58
attr_reader :server
Instance Method Details
#app_metadata
# File 'lib/mongo/server/connection_base.rb', line 107
def @app_metadata ||= begin same = true AppMetadata::AUTH_OPTION_KEYS.each do |key| if @server. [key] != [key] same = false break end end if same @server. else AppMetadata.new( .merge(purpose: @server. .purpose)) end end end
#check_timeout!(context) (private)
If timeoutMS is set for the operation context, checks whether there is enough time left to send the corresponding message to the server (remaining timeout is bigger than minimum round trip time for the server)
# File 'lib/mongo/server/connection_base.rb', line 288
def check_timeout!(context) return if [nil, 0].include?(context.deadline) time_to_execute = context.remaining_timeout_sec - server.minimum_round_trip_time if time_to_execute <= 0 raise Mongo::Error::TimeoutError end end
#deliver(message, context, options = {}) (private)
# File 'lib/mongo/server/connection_base.rb', line 167
def deliver(, context, = {}) if Lint.enabled? && !@socket raise Error::LintError, "Trying to deliver a message over a disconnected connection (to #{address})" end buffer = serialize(, context) check_timeout!(context) ensure_connected do |socket| operation_id = Monitoring.next_operation_id started_event = command_started(address, operation_id, .payload, socket_object_id: socket.object_id, connection_id: id, connection_generation: generation, server_connection_id: description.server_connection_id, service_id: description.service_id, ) start = Utils.monotonic_time result = nil begin result = add_server_diagnostics do socket.write(buffer.to_s, timeout: context.remaining_timeout_sec) if .replyable? check_timeout!(context) Protocol::Message.deserialize(socket, , .request_id, .merge(timeout: context.remaining_timeout_sec)) else nil end end rescue Exception => e total_duration = Utils.monotonic_time - start command_failed(nil, address, operation_id, .payload, e., total_duration, started_event: started_event, server_connection_id: description.server_connection_id, service_id: description.service_id, ) raise else total_duration = Utils.monotonic_time - start command_completed(result, address, operation_id, .payload, total_duration, started_event: started_event, server_connection_id: description.server_connection_id, service_id: description.service_id, ) end if result && context.decrypt? result = result.maybe_decrypt(context) end result end end
#dispatch(messages, context, options = {}) ⇒ Protocol::Message | nil
This method is named dispatch since ‘send’ is a core Ruby method on all objects.
For backwards compatibility, this method accepts the messages as an array. However, exactly one message must be given per invocation.
Dispatch a single message to the connection. If the message requires a response, a reply will be returned.
# File 'lib/mongo/server/connection_base.rb', line 150
def dispatch(, context, = {}) # The monitoring code does not correctly handle multiple messages, # and the driver internally does not send more than one message at # a time ever. Thus prohibit multiple message use for now. if .length != 1 raise ArgumentError, 'Can only dispatch one message at a time' end if description.unknown? raise Error::InternalDriverError, "Cannot dispatch a message on a connection with unknown description: #{description.inspect}" end = .first deliver(, context, ) end
#generation ⇒ Integer
| nil
Connection
pool generation from which this connection was created. May be nil.
# File 'lib/mongo/server/connection_base.rb', line 100
def generation # If the connection is to a load balancer, @generation is set # after handshake completes. If the connection is to another server # type, generation is specified during connection creation. @generation || [:generation] end
#serialize(message, context, buffer = BSON::ByteBuffer.new) (private)
# File 'lib/mongo/server/connection_base.rb', line 218
def serialize(, context, buffer = BSON::ByteBuffer.new) # Driver specifications only mandate the fixed 16MiB limit for # serialized BSON documents. However, the server returns its # active serialized BSON document size limit in the hello response, # which is +max_bson_object_size+ below. The +DEFAULT_MAX_BSON_OBJECT_SIZE+ # is the 16MiB value mandated by the specifications which we use # only as the default if the server's hello did not contain # maxBsonObjectSize. max_bson_size = max_bson_object_size || DEFAULT_MAX_BSON_OBJECT_SIZE if context.encrypt? # The client-side encryption specification requires bulk writes to # be split at a reduced maxBsonObjectSize. If this message is a bulk # write and its size exceeds the reduced size limit, the serializer # will raise an exception, which is caught by BulkWrite. BulkWrite # will split the operation into individual writes, which will # not be subject to the reduced maxBsonObjectSize. if .bulk_write? # Make the new maximum size equal to the specified reduced size # limit plus the 16KiB overhead allowance. max_bson_size = REDUCED_MAX_BSON_SIZE end end # RUBY-2234: It is necessary to check that the message size does not # exceed the maximum bson object size before compressing and serializing # the final message. # # This is to avoid the case where the user performs a bulk write # larger than 16MiB which, when compressed, becomes smaller than 16MiB. # If the driver does not split the bulk writes prior to compression, # the entire operation will be sent to the server, which will raise an # error because the uncompressed operation exceeds the maximum bson size. # # To address this problem, we serialize the message prior to compression # and raise an exception if the serialized message exceeds the maximum # bson size. if # Create a separate buffer that contains the un-compressed message # for the purpose of checking its size. Write any pre-existing contents # from the original buffer into the temporary one. temp_buffer = BSON::ByteBuffer.new # TODO: address the fact that this line mutates the buffer. temp_buffer.put_bytes(buffer.get_bytes(buffer.length)) .serialize(temp_buffer, max_bson_size, MAX_BSON_COMMAND_OVERHEAD) if temp_buffer.length > raise Error::MaxMessageSize.new( ) end end # RUBY-2335: When the un-compressed message is smaller than the maximum # bson size limit, the message will be serialized twice. The operations # layer should be refactored to allow compression on an already- # serialized message. = .maybe_compress(compressor, [:zlib_compression_level]) .serialize(buffer, max_bson_size, MAX_BSON_COMMAND_OVERHEAD) buffer end
#service_id ⇒ nil
| Object
# File 'lib/mongo/server/connection_base.rb', line 92
def service_id description&.service_id end