123456789_123456789_123456789_123456789_123456789_

Class: Mongo::Socket Private

Relationships & Source Files
Namespace Children
Modules:
Classes:
Extension / Inclusion / Inheritance Descendants
Subclasses:
Super Chains via Extension / Inclusion / Inheritance
Instance Chain:
self, Socket::Constants
Inherits: Object
Defined in: lib/mongo/socket.rb,
lib/mongo/socket/ocsp_cache.rb,
lib/mongo/socket/ocsp_verifier.rb,
lib/mongo/socket/ssl.rb,
lib/mongo/socket/tcp.rb,
lib/mongo/socket/unix.rb

Overview

Provides additional data around sockets for the driver’s use.

Since:

  • 2.0.0

Constant Summary

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Instance Attribute Details

#alive?true, false (readonly)

This method is for internal use only.
Deprecated.

Use #connectable? on the connection instead.

Is the socket connection alive?

Examples:

Is the socket alive?

socket.alive?

Returns:

  • (true, false)

    If the socket is alive.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 144

def alive?
  sock_arr = [ @socket ]
  if Kernel::select(sock_arr, nil, sock_arr, 0)
    # The eof? call is supposed to return immediately since select
    # indicated the socket is readable. However, if @socket is a TLS
    # socket, eof? can block anyway - see RUBY-2140.
    begin
      Timeout.timeout(0.1) do
        eof?
      end
    rescue ::Timeout::Error
      true
    end
  else
    true
  end
end

#connectable?true (readonly)

This method is for internal use only.
Deprecated.

For backwards compatibility only, do not use.

Returns:

  • (true)

    Always true.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 270

def connectable?
  true
end

#eof?Boolean (readonly)

This method is for internal use only.

Tests if this socket has reached EOF. Primarily used for liveness checks.

Since:

  • 2.0.5

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 259

def eof?
  @socket.eof?
rescue IOError, SystemCallError
  true
end

#familyInteger (readonly)

This method is for internal use only.

Returns:

  • (Integer)

    family The type of host family.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 78

attr_reader :family

#monitor?true | false (readonly)

This method is for internal use only.

Returns:

  • (true | false)

    Whether this socket was created by a monitoring connection.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 108

def monitor?
  !!options[:monitor]
end

#optionsHash (readonly)

This method is for internal use only.

Returns:

  • (Hash)

    The options.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 84

attr_reader :options

#socketSocket (readonly)

This method is for internal use only.

Returns:

  • (Socket)

    socket The wrapped socket.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 81

attr_reader :socket

#timeoutFloat (readonly)

This method is for internal use only.

Returns:

  • (Float)

    timeout The socket timeout.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 87

attr_reader :timeout

Instance Method Details

#allocate_string(capacity) (private)

This method is for internal use only.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 453

def allocate_string(capacity)
  String.new('', :capacity => capacity, :encoding => 'BINARY')
end

#closetrue

This method is for internal use only.

Close the socket.

Examples:

Close the socket.

socket.close

Returns:

  • (true)

    Always true.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 170

def close
  begin
    # Sometimes it seems the close call can hang for a long time
    ::Timeout.timeout(5) do
      @socket&.close
    end
  rescue
    # Silence all errors
  end
  true
end

#connection_addressAddress

This method is for internal use only.

Returns:

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 92

def connection_address
  options[:connection_address]
end

#connection_generationInteger

This method is for internal use only.

Returns:

  • (Integer)

    Generation of the connection (for non-monitoring connections) that created this socket.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 100

def connection_generation
  options[:connection_generation]
end

#do_write(*args, timeout: nil) ⇒ Integer (private)

This method is for internal use only.

Writes data to the socket instance.

This is a separate method from #write for ease of mocking in the tests. This method should not perform any exception mapping, upstream code sholud map exceptions.

Parameters:

  • args (Array<Object>)

    The data to be written.

  • :timeout (Numeric)

    The total timeout to the whole write operation.

Returns:

  • (Integer)

    The length of bytes written to the socket.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 473

def do_write(*args, timeout: nil)
  if timeout.nil?
    write_without_timeout(*args)
  else
    write_with_timeout(*args, timeout: timeout)
  end
end

#gets(*args) ⇒ Object

This method is for internal use only.

Delegates gets to the underlying socket.

Examples:

Get the next line.

socket.gets(10)

Parameters:

  • args (Array<Object>)

    The arguments to pass through.

Returns:

  • (Object)

    The returned bytes.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 192

def gets(*args)
  map_exceptions do
    @socket.gets(*args)
  end
end

#human_address (private)

This method is for internal use only.

Raises:

  • (NotImplementedError)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 619

def human_address
  raise NotImplementedError
end

#map_exceptions (private)

This method is for internal use only.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 607

def map_exceptions
  begin
    yield
  rescue Errno::ETIMEDOUT => e
    raise Error::SocketTimeoutError, "#{e.class}: #{e} (for #{human_address})"
  rescue IOError, SystemCallError, ::SocketError => e
    raise Error::SocketError, "#{e.class}: #{e} (for #{human_address})"
  rescue OpenSSL::SSL::SSLError => e
    raise Error::SocketError, "#{e.class}: #{e} (for #{human_address})"
  end
end

#pipeIO

This method is for internal use only.

listen on during the select system call when reading from the socket.

Returns:

  • (IO)

    The file descriptor for the read end of the pipe to

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 115

def pipe
  options[:pipe]
end

#raise_timeout_error!(message = nil, csot = false) (private)

This method is for internal use only.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 623

def raise_timeout_error!(message = nil, csot = false)
  if csot
    raise Mongo::Error::TimeoutError
  else
    raise Errno::ETIMEDOUT, message
  end
end

#read(length, socket_timeout: nil, timeout: nil) ⇒ Object

This method is for internal use only.

Will read all data from the socket for the provided number of bytes. If no data is returned, an exception will be raised.

Examples:

Read all the requested data from the socket.

socket.read(4096)

Parameters:

  • length (Integer)

    The number of bytes to read.

  • socket_timeout (Numeric)

    The timeout to use for each chunk read, mutually exclusive to #timeout.

  • timeout (Numeric)

    The total timeout to the whole read operation, mutually exclusive to socket_timeout.

Returns:

  • (Object)

    The data from the socket.

Raises:

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 215

def read(length, socket_timeout: nil, timeout: nil)
  if !socket_timeout.nil? && !timeout.nil?
    raise ArgumentError, 'Both timeout and socket_timeout cannot be set'
  end
  if !socket_timeout.nil? || timeout.nil?
    read_without_timeout(length, socket_timeout)
  else
    read_with_timeout(length, timeout)
  end
end

#read_buffer_size (private)

This method is for internal use only.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 457

def read_buffer_size
  # Buffer size for non-TLS reads
  # 64kb
  65536
end

#read_from_socket(length, socket_timeout: nil, csot: false) ⇒ Object (private)

This method is for internal use only.

Reads the length bytes from the socket. The read operation may involve multiple socket reads, each read is limited to #timeout second, if the parameter is provided.

Parameters:

  • length (Integer)

    The number of bytes to read.

  • :socket_timeout (Numeric)

    The timeout to use for each chunk read.

  • :csot (true | false)

    Whether the CSOT timeout is set for the operation.

Returns:

  • (Object)

    The data from the socket.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 334

def read_from_socket(length, socket_timeout: nil, csot: false)
  # Just in case
  if length == 0
    return ''.force_encoding('BINARY')
  end

  _timeout = socket_timeout || self.timeout
  if _timeout
    if _timeout > 0
      deadline = Utils.monotonic_time + _timeout
    elsif _timeout < 0
      raise_timeout_error!("Negative timeout #{_timeout} given to socket", csot)
    end
  end

  # We want to have a fixed and reasonably small size buffer for reads
  # because, for example, OpenSSL reads in 16 kb chunks max.
  # Having a 16 mb buffer means there will be 1000 reads each allocating
  # 16 mb of memory and using 16 kb of it.
  buf_size = read_buffer_size
  data = nil

  # If we want to read less than the buffer size, just allocate the
  # memory that is necessary
  if length < buf_size
    buf_size = length
  end

  # The binary encoding is important, otherwise Ruby performs encoding
  # conversions of some sort during the write into the buffer which
  # kills performance
  buf = allocate_string(buf_size)
  retrieved = 0
  begin
    while retrieved < length
      retrieve = length - retrieved
      if retrieve > buf_size
        retrieve = buf_size
      end
      chunk = @socket.read_nonblock(retrieve, buf)

      # If we read the entire wanted length in one operation,
      # return the data as is which saves one memory allocation and
      # one copy per read
      if retrieved == 0 && chunk.length == length
        return chunk
      end

      # If we are here, we are reading the wanted length in
      # multiple operations. Allocate the total buffer here rather
      # than up front so that the special case above won't be
      # allocating twice
      if data.nil?
        data = allocate_string(length)
      end

      # ... and we need to copy the chunks at this point
      data[retrieved, chunk.length] = chunk
      retrieved += chunk.length
    end
  # As explained in https://ruby-doc.com/core-trunk/IO.html#method-c-select,
  # reading from a TLS socket may require writing which may raise WaitWritable
  rescue IO::WaitReadable, IO::WaitWritable => exc
    if deadline
      select_timeout = deadline - Utils.monotonic_time
      if select_timeout <= 0
        raise_timeout_error!("Took more than #{_timeout} seconds to receive data", csot)
      end
    end
    if exc.is_a?(IO::WaitReadable)
      if pipe
        select_args = [[@socket, pipe], nil, [@socket, pipe], select_timeout]
      else
        select_args = [[@socket], nil, [@socket], select_timeout]
      end
    else
      select_args = [nil, [@socket], [@socket], select_timeout]
    end

    rv = Kernel.select(*select_args)
    if Lint.enabled?
      if pipe && rv&.include?(pipe)
        # If the return value of select is the read end of the pipe, and
        # an IOError is not raised, then that means the socket is still
        # open. Select is interrupted be closing the write end of the
        # pipe, which either returns the pipe if the socket is open, or
        # raises an IOError if it isn't. Select is interrupted after all
        # of the pending and checked out connections have been interrupted
        # and closed, and this only happens once the pool is cleared with
        # interrupt_in_use connections flag. This means that in order for
        # the socket to still be open when the select is interrupted, and
        # that socket is being read from, that means after clear was
        # called, a connection from the previous generation was checked
        # out of the pool, for reading on its socket. This should be impossible.
        raise Mongo::LintError, "Select interrupted for live socket. This should be impossible."
      end
    end

    if BSON::Environment.jruby?
      # Ignore the return value of Kernel.select.
      # On JRuby, select appears to return nil prior to timeout expiration
      # (apparently due to a EAGAIN) which then causes us to fail the read
      # even though we could have retried it.
      # Check the deadline ourselves.
      if deadline
        select_timeout = deadline - Utils.monotonic_time
        if select_timeout <= 0
          raise_timeout_error!("Took more than #{_timeout} seconds to receive data", csot)
        end
      end
    elsif rv.nil?
      raise_timeout_error!("Took more than #{_timeout} seconds to receive data (select call timed out)", csot)
    end
    retry
  end

  data
end

#read_with_timeout(length, timeout) ⇒ Object (private)

This method is for internal use only.

Reads the length bytes from the socket, the read operation duration is limited to #timeout second.

Parameters:

  • length (Integer)

    The number of bytes to read.

  • timeout (Numeric)

    The total timeout to the whole read operation.

Returns:

  • (Object)

    The data from the socket.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 283

def read_with_timeout(length, timeout)
  deadline = Utils.monotonic_time + timeout
  map_exceptions do
    String.new.tap do |data|
      while data.length < length
        socket_timeout = deadline - Utils.monotonic_time
        if socket_timeout <= 0
          raise Mongo::Error::TimeoutError
        end
        chunk = read_from_socket(length - data.length, socket_timeout: socket_timeout, csot: true)
        unless chunk.length > 0
          raise IOError, "Expected to read > 0 bytes but read 0 bytes"
        end
        data << chunk
      end
    end
  end
end

#read_without_timeout(length, socket_timeout = nil) ⇒ Object (private)

This method is for internal use only.

Reads the length bytes from the socket. The read operation may involve multiple socket reads, each read is limited to #timeout second, if the parameter is provided.

Parameters:

  • length (Integer)

    The number of bytes to read.

  • socket_timeout (Numeric) (defaults to: nil)

    The timeout to use for each chunk read.

Returns:

  • (Object)

    The data from the socket.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 310

def read_without_timeout(length, socket_timeout = nil)
  map_exceptions do
    String.new.tap do |data|
      while data.length < length
        chunk = read_from_socket(length - data.length, socket_timeout: socket_timeout)
        unless chunk.length > 0
          raise IOError, "Expected to read > 0 bytes but read 0 bytes"
        end
        data << chunk
      end
    end
  end
end

#readbyteObject

This method is for internal use only.

Read a single byte from the socket.

Examples:

Read a single byte.

socket.readbyte

Returns:

  • (Object)

    The read byte.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 234

def readbyte
  map_exceptions do
    @socket.readbyte
  end
end

#set_keepalive_opts(sock) (private)

This method is for internal use only.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 580

def set_keepalive_opts(sock)
  sock.setsockopt(SOL_SOCKET, SO_KEEPALIVE, true)
  set_option(sock, :TCP_KEEPINTVL, DEFAULT_TCP_KEEPINTVL)
  set_option(sock, :TCP_KEEPCNT, DEFAULT_TCP_KEEPCNT)
  set_option(sock, :TCP_KEEPIDLE, DEFAULT_TCP_KEEPIDLE)
  set_option(sock, :TCP_USER_TIMEOUT, DEFAULT_TCP_USER_TIMEOUT)
rescue
  # JRuby 9.2.13.0 and lower do not define TCP_KEEPINTVL etc. constants.
  # JRuby 9.2.14.0 defines the constants but does not allow to get or
  # set them with this error:
  # Errno::ENOPROTOOPT: Protocol not available - Protocol not available
end

#set_option(sock, option, default) (private)

This method is for internal use only.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 593

def set_option(sock, option, default)
  if Socket.const_defined?(option)
    system_default = sock.getsockopt(IPPROTO_TCP, option).int
    if system_default > default
      sock.setsockopt(IPPROTO_TCP, option, default)
    end
  end
end

#set_socket_options(sock) (private)

This method is for internal use only.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 602

def set_socket_options(sock)
  sock.set_encoding(BSON::BINARY)
  set_keepalive_opts(sock)
end

#summaryString

This method is for internal use only.

Returns:

  • (String)

    Human-readable summary of the socket for debugging.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 122

def summary
  fileno = @socket&.fileno rescue '<no socket>' || '<no socket>'
  if monitor?
    indicator = if options[:push]
      'pm'
    else
      'm'
    end
    "#{connection_address};#{indicator};fd=#{fileno}"
  else
    "#{connection_address};c:#{connection_generation};fd=#{fileno}"
  end
end

#unix_socket?(sock) ⇒ Boolean (private)

This method is for internal use only.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 568

def unix_socket?(sock)
  defined?(UNIXSocket) && sock.is_a?(UNIXSocket)
end

#wait_for_socket_to_be_writable(deadline) (private)

This method is for internal use only.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 551

def wait_for_socket_to_be_writable(deadline)
  select_timeout = deadline - Utils.monotonic_time
  rv = Kernel.select(nil, [@socket], nil, select_timeout)

  if BSON::Environment.jruby?
    # Ignore the return value of Kernel.select.
    # On JRuby, select appears to return nil prior to timeout expiration
    # (apparently due to a EAGAIN) which then causes us to fail the read
    # even though we could have retried it.
    # Check the deadline ourselves.
    select_timeout = deadline - Utils.monotonic_time
    return select_timeout > 0
  end

  !rv.nil?
end

#write(*args, timeout: nil) ⇒ Integer

This method is for internal use only.

Writes data to the socket instance.

Parameters:

  • args (Array<Object>)

    The data to be written.

  • timeout (Numeric)

    The total timeout to the whole write operation.

Returns:

  • (Integer)

    The length of bytes written to the socket.

Raises:

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 250

def write(*args, timeout: nil)
  map_exceptions do
    do_write(*args, timeout: timeout)
  end
end

#write_chunk(chunk, timeout) (private)

This method is for internal use only.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 532

def write_chunk(chunk, timeout)
  deadline = Utils.monotonic_time + timeout

  written = 0
  while written < chunk.length
    begin
      written += @socket.write_nonblock(chunk[written..-1])
    rescue IO::WaitWritable, Errno::EINTR
      if !wait_for_socket_to_be_writable(deadline)
        raise_timeout_error!("Took more than #{timeout} seconds to receive data", true)
      end

      retry
    end
  end

  written
end

#write_with_timeout(*args, timeout:) ⇒ Integer (private)

This method is for internal use only.

Writes data to to the socket, the write duration is limited to #timeout.

Parameters:

  • args (Array<Object>)

    The data to be written.

  • :timeout (Numeric)

    The total timeout to the whole write operation.

Returns:

  • (Integer)

    The length of bytes written to the socket.

Raises:

  • (ArgumentError)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 515

def write_with_timeout(*args, timeout:)
  raise ArgumentError, 'timeout cannot be nil' if timeout.nil?
  raise_timeout_error!("Negative timeout #{timeout} given to socket", true) if timeout < 0

  written = 0
  args.each do |buf|
    buf = buf.to_s
    i = 0
    while i < buf.length
      chunk = buf[i...(i + WRITE_CHUNK_SIZE)]
      written += write_chunk(chunk, timeout)
      i += WRITE_CHUNK_SIZE
    end
  end
  written
end

#write_without_timeout(*args) ⇒ Integer (private)

This method is for internal use only.

Writes data to to the socket.

Parameters:

  • args (Array<Object>)

    The data to be written.

Returns:

  • (Integer)

    The length of bytes written to the socket.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 486

def write_without_timeout(*args)
  # This method used to forward arguments to @socket.write in a
  # single call like so:
  #
  # @socket.write(*args)
  #
  # Turns out, when each buffer to be written is large (e.g. 32 MiB),
  # this write call would take an extremely long time (20+ seconds)
  # while using 100% CPU. Splitting the writes into chunks produced
  # massively better performance (0.05 seconds to write the 32 MiB of
  # data on the same hardware). Unfortunately splitting the data,
  # one would assume, results in it being copied, but this seems to be
  # a much more minor issue compared to CPU cost of writing large buffers.
  args.each do |buf|
    buf = buf.to_s
    i = 0
    while i < buf.length
      chunk = buf[i, WRITE_CHUNK_SIZE]
      i += @socket.write(chunk)
    end
  end
end