123456789_123456789_123456789_123456789_123456789_

Class: Mongo::Socket Private

Do not use. This class is for internal use only.
Relationships & Source Files
Namespace Children
Modules:
Classes:
Extension / Inclusion / Inheritance Descendants
Subclasses:
Super Chains via Extension / Inclusion / Inheritance
Instance Chain:
self, Socket::Constants
Inherits: Object
Defined in: lib/mongo/socket.rb,
lib/mongo/socket/ocsp_cache.rb,
lib/mongo/socket/ocsp_verifier.rb,
lib/mongo/socket/ssl.rb,
lib/mongo/socket/tcp.rb,
lib/mongo/socket/unix.rb

Overview

Provides additional data around sockets for the driver’s use.

Since:

  • 2.0.0

Constant Summary

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Instance Attribute Details

#alive?true, false (readonly)

Deprecated.

Use #connectable? on the connection instead.

Is the socket connection alive?

Examples:

Is the socket alive?

socket.alive?

Returns:

  • (true, false)

    If the socket is alive.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 134

def alive?
  sock_arr = [ @socket ]
  if Kernel::select(sock_arr, nil, sock_arr, 0)
    # The eof? call is supposed to return immediately since select
    # indicated the socket is readable. However, if @socket is a TLS
    # socket, eof? can block anyway - see RUBY-2140.
    begin
      Timeout.timeout(0.1) do
        eof?
      end
    rescue ::Timeout::Error
      true
    end
  else
    true
  end
end

#connectable?true (readonly)

Deprecated.

For backwards compatibility only, do not use.

Returns:

  • (true)

    Always true.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 260

def connectable?
  true
end

#eof?Boolean (readonly)

Tests if this socket has reached EOF. Primarily used for liveness checks.

Since:

  • 2.0.5

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 249

def eof?
  @socket.eof?
rescue IOError, SystemCallError
  true
end

#familyInteger (readonly)

Returns:

  • (Integer)

    family The type of host family.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 75

attr_reader :family

#monitor?true | false (readonly)

Returns:

  • (true | false)

    Whether this socket was created by a monitoring connection.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 105

def monitor?
  !!options[:monitor]
end

#optionsHash (readonly)

Returns:

  • (Hash)

    The options.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 81

attr_reader :options

#socketSocket (readonly)

Returns:

  • (Socket)

    socket The wrapped socket.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 78

attr_reader :socket

#timeoutFloat (readonly)

Returns:

  • (Float)

    timeout The socket timeout.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 84

attr_reader :timeout

Instance Method Details

#allocate_string(capacity) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 444

def allocate_string(capacity)
  String.new('', :capacity => capacity, :encoding => 'BINARY')
end

#closetrue

Close the socket.

Examples:

Close the socket.

socket.close

Returns:

  • (true)

    Always true.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 160

def close
  begin
    # Sometimes it seems the close call can hang for a long time
    ::Timeout.timeout(5) do
      @socket.close
    end
  rescue
    # Silence all errors
  end
  true
end

#connection_addressAddress

Returns:

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 89

def connection_address
  options[:connection_address]
end

#connection_generationInteger

Returns:

  • (Integer)

    Generation of the connection (for non-monitoring connections) that created this socket.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 97

def connection_generation
  options[:connection_generation]
end

#do_write(*args, timeout: nil) ⇒ Integer (private)

Writes data to the socket instance.

This is a separate method from #write for ease of mocking in the tests. This method should not perform any exception mapping, upstream code sholud map exceptions.

Parameters:

  • args (Array<Object>)

    The data to be written.

  • :timeout (Numeric)

    The total timeout to the whole write operation.

Returns:

  • (Integer)

    The length of bytes written to the socket.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 464

def do_write(*args, timeout: nil)
  if timeout.nil?
    write_without_timeout(*args)
  else
    write_with_timeout(*args, timeout: timeout)
  end
end

#gets(*args) ⇒ Object

Delegates gets to the underlying socket.

Examples:

Get the next line.

socket.gets(10)

Parameters:

  • args (Array<Object>)

    The arguments to pass through.

Returns:

  • (Object)

    The returned bytes.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 182

def gets(*args)
  map_exceptions do
    @socket.gets(*args)
  end
end

#human_address (private)

Raises:

  • (NotImplementedError)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 603

def human_address
  raise NotImplementedError
end

#map_exceptions (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 591

def map_exceptions
  begin
    yield
  rescue Errno::ETIMEDOUT => e
    raise Error::SocketTimeoutError, "#{e.class}: #{e} (for #{human_address})"
  rescue IOError, SystemCallError => e
    raise Error::SocketError, "#{e.class}: #{e} (for #{human_address})"
  rescue OpenSSL::SSL::SSLError => e
    raise Error::SocketError, "#{e.class}: #{e} (for #{human_address})"
  end
end

#raise_timeout_error!(message = nil, csot = false) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 607

def raise_timeout_error!(message = nil, csot = false)
  if csot
    raise Mongo::Error::TimeoutError
  else
    raise Errno::ETIMEDOUT, message
  end
end

#read(length, socket_timeout: nil, timeout: nil) ⇒ Object

Will read all data from the socket for the provided number of bytes. If no data is returned, an exception will be raised.

Examples:

Read all the requested data from the socket.

socket.read(4096)

Parameters:

  • length (Integer)

    The number of bytes to read.

  • socket_timeout (Numeric)

    The timeout to use for each chunk read, mutually exclusive to #timeout.

  • timeout (Numeric)

    The total timeout to the whole read operation, mutually exclusive to socket_timeout.

Returns:

  • (Object)

    The data from the socket.

Raises:

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 205

def read(length, socket_timeout: nil, timeout: nil)
  if !socket_timeout.nil? && !timeout.nil?
    raise ArgumentError, 'Both timeout and socket_timeout cannot be set'
  end
  if !socket_timeout.nil? || timeout.nil?
    read_without_timeout(length, socket_timeout)
  else
    read_with_timeout(length, timeout)
  end
end

#read_buffer_size (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 448

def read_buffer_size
  # Buffer size for non-TLS reads
  # 64kb
  65536
end

#read_from_socket(length, socket_timeout: nil, csot: false) ⇒ Object (private)

Reads the length bytes from the socket. The read operation may involve multiple socket reads, each read is limited to #timeout second, if the parameter is provided.

Parameters:

  • length (Integer)

    The number of bytes to read.

  • :socket_timeout (Numeric)

    The timeout to use for each chunk read.

  • :csot (true | false)

    Whether the CSOT timeout is set for the operation.

Returns:

  • (Object)

    The data from the socket.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 324

def read_from_socket(length, socket_timeout: nil, csot: false)
  # Just in case
  if length == 0
    return ''.force_encoding('BINARY')
  end

  _timeout = socket_timeout || self.timeout
  if _timeout
    if _timeout > 0
      deadline = Utils.monotonic_time + _timeout
    elsif _timeout < 0
      raise_timeout_error!("Negative timeout #{_timeout} given to socket", csot)
    end
  end

  # We want to have a fixed and reasonably small size buffer for reads
  # because, for example, OpenSSL reads in 16 kb chunks max.
  # Having a 16 mb buffer means there will be 1000 reads each allocating
  # 16 mb of memory and using 16 kb of it.
  buf_size = read_buffer_size
  data = nil

  # If we want to read less than the buffer size, just allocate the
  # memory that is necessary
  if length < buf_size
    buf_size = length
  end

  # The binary encoding is important, otherwise Ruby performs encoding
  # conversions of some sort during the write into the buffer which
  # kills performance
  buf = allocate_string(buf_size)
  retrieved = 0
  begin
    while retrieved < length
      retrieve = length - retrieved
      if retrieve > buf_size
        retrieve = buf_size
      end
      chunk = @socket.read_nonblock(retrieve, buf)

      # If we read the entire wanted length in one operation,
      # return the data as is which saves one memory allocation and
      # one copy per read
      if retrieved == 0 && chunk.length == length
        return chunk
      end

      # If we are here, we are reading the wanted length in
      # multiple operations. Allocate the total buffer here rather
      # than up front so that the special case above won't be
      # allocating twice
      if data.nil?
        data = allocate_string(length)
      end

      # ... and we need to copy the chunks at this point
      data[retrieved, chunk.length] = chunk
      retrieved += chunk.length
    end
  # As explained in https://ruby-doc.com/core-trunk/IO.html#method-c-select,
  # reading from a TLS socket may require writing which may raise WaitWritable
  rescue IO::WaitReadable, IO::WaitWritable => exc
    if deadline
      select_timeout = deadline - Utils.monotonic_time
      if select_timeout <= 0
        raise_timeout_error!("Took more than #{_timeout} seconds to receive data", csot)
      end
    end
    pipe = options[:pipe]
    if exc.is_a?(IO::WaitReadable)
      if pipe
        select_args = [[@socket, pipe], nil, [@socket, pipe], select_timeout]
      else
        select_args = [[@socket], nil, [@socket], select_timeout]
      end
    else
      select_args = [nil, [@socket], [@socket], select_timeout]
    end

    rv = Kernel.select(*select_args)
    if Lint.enabled?
      if pipe && rv&.include?(pipe)
        # If the return value of select is the read end of the pipe, and
        # an IOError is not raised, then that means the socket is still
        # open. Select is interrupted be closing the write end of the
        # pipe, which either returns the pipe if the socket is open, or
        # raises an IOError if it isn't. Select is interrupted after all
        # of the pending and checked out connections have been interrupted
        # and closed, and this only happens once the pool is cleared with
        # interrupt_in_use connections flag. This means that in order for
        # the socket to still be open when the select is interrupted, and
        # that socket is being read from, that means after clear was
        # called, a connection from the previous generation was checked
        # out of the pool, for reading on its socket. This should be impossible.
        raise Mongo::LintError, "Select interrupted for live socket. This should be impossible."
      end
    end

    if BSON::Environment.jruby?
      # Ignore the return value of Kernel.select.
      # On JRuby, select appears to return nil prior to timeout expiration
      # (apparently due to a EAGAIN) which then causes us to fail the read
      # even though we could have retried it.
      # Check the deadline ourselves.
      if deadline
        select_timeout = deadline - Utils.monotonic_time
        if select_timeout <= 0
          raise_timeout_error!("Took more than #{_timeout} seconds to receive data", csot)
        end
      end
    elsif rv.nil?
      raise_timeout_error!("Took more than #{_timeout} seconds to receive data (select call timed out)", csot)
    end
    retry
  end

  data
end

#read_with_timeout(length, timeout) ⇒ Object (private)

Reads the length bytes from the socket, the read operation duration is limited to #timeout second.

Parameters:

  • length (Integer)

    The number of bytes to read.

  • timeout (Numeric)

    The total timeout to the whole read operation.

Returns:

  • (Object)

    The data from the socket.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 273

def read_with_timeout(length, timeout)
  deadline = Utils.monotonic_time + timeout
  map_exceptions do
    String.new.tap do |data|
      while data.length < length
        socket_timeout = deadline - Utils.monotonic_time
        if socket_timeout <= 0
          raise Mongo::Error::TimeoutError
        end
        chunk = read_from_socket(length - data.length, socket_timeout: socket_timeout, csot: true)
        unless chunk.length > 0
          raise IOError, "Expected to read > 0 bytes but read 0 bytes"
        end
        data << chunk
      end
    end
  end
end

#read_without_timeout(length, socket_timeout = nil) ⇒ Object (private)

Reads the length bytes from the socket. The read operation may involve multiple socket reads, each read is limited to #timeout second, if the parameter is provided.

Parameters:

  • length (Integer)

    The number of bytes to read.

  • socket_timeout (Numeric) (defaults to: nil)

    The timeout to use for each chunk read.

Returns:

  • (Object)

    The data from the socket.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 300

def read_without_timeout(length, socket_timeout = nil)
  map_exceptions do
    String.new.tap do |data|
      while data.length < length
        chunk = read_from_socket(length - data.length, socket_timeout: socket_timeout)
        unless chunk.length > 0
          raise IOError, "Expected to read > 0 bytes but read 0 bytes"
        end
        data << chunk
      end
    end
  end
end

#readbyteObject

Read a single byte from the socket.

Examples:

Read a single byte.

socket.readbyte

Returns:

  • (Object)

    The read byte.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 224

def readbyte
  map_exceptions do
    @socket.readbyte
  end
end

#set_keepalive_opts(sock) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 564

def set_keepalive_opts(sock)
  sock.setsockopt(SOL_SOCKET, SO_KEEPALIVE, true)
  set_option(sock, :TCP_KEEPINTVL, DEFAULT_TCP_KEEPINTVL)
  set_option(sock, :TCP_KEEPCNT, DEFAULT_TCP_KEEPCNT)
  set_option(sock, :TCP_KEEPIDLE, DEFAULT_TCP_KEEPIDLE)
  set_option(sock, :TCP_USER_TIMEOUT, DEFAULT_TCP_USER_TIMEOUT)
rescue
  # JRuby 9.2.13.0 and lower do not define TCP_KEEPINTVL etc. constants.
  # JRuby 9.2.14.0 defines the constants but does not allow to get or
  # set them with this error:
  # Errno::ENOPROTOOPT: Protocol not available - Protocol not available
end

#set_option(sock, option, default) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 577

def set_option(sock, option, default)
  if Socket.const_defined?(option)
    system_default = sock.getsockopt(IPPROTO_TCP, option).int
    if system_default > default
      sock.setsockopt(IPPROTO_TCP, option, default)
    end
  end
end

#set_socket_options(sock) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 586

def set_socket_options(sock)
  sock.set_encoding(BSON::BINARY)
  set_keepalive_opts(sock)
end

#summaryString

Returns:

  • (String)

    Human-readable summary of the socket for debugging.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 112

def summary
  fileno = @socket&.fileno rescue '<no socket>' || '<no socket>'
  if monitor?
    indicator = if options[:push]
      'pm'
    else
      'm'
    end
    "#{connection_address};#{indicator};fd=#{fileno}"
  else
    "#{connection_address};c:#{connection_generation};fd=#{fileno}"
  end
end

#unix_socket?(sock) ⇒ Boolean (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 552

def unix_socket?(sock)
  defined?(UNIXSocket) && sock.is_a?(UNIXSocket)
end

#write(*args, timeout: nil) ⇒ Integer

Writes data to the socket instance.

Parameters:

  • args (Array<Object>)

    The data to be written.

  • timeout (Numeric)

    The total timeout to the whole write operation.

Returns:

  • (Integer)

    The length of bytes written to the socket.

Raises:

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 240

def write(*args, timeout: nil)
  map_exceptions do
    do_write(*args, timeout: timeout)
  end
end

#write_chunk(chunk, timeout) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 524

def write_chunk(chunk, timeout)
  deadline = Utils.monotonic_time + timeout
  written = 0
  begin
    written += @socket.write_nonblock(chunk[written..-1])
  rescue IO::WaitWritable, Errno::EINTR
    select_timeout = deadline - Utils.monotonic_time
    rv = Kernel.select(nil, [@socket], nil, select_timeout)
    if BSON::Environment.jruby?
      # Ignore the return value of Kernel.select.
      # On JRuby, select appears to return nil prior to timeout expiration
      # (apparently due to a EAGAIN) which then causes us to fail the read
      # even though we could have retried it.
      # Check the deadline ourselves.
      if deadline
        select_timeout = deadline - Utils.monotonic_time
        if select_timeout <= 0
          raise_timeout_error!("Took more than #{timeout} seconds to receive data", true)
        end
      end
    elsif rv.nil?
      raise_timeout_error!("Took more than #{timeout} seconds to receive data (select call timed out)", true)
    end
    retry
  end
  written
end

#write_with_timeout(*args, timeout:) ⇒ Integer (private)

Writes data to to the socket, the write duration is limited to #timeout.

Parameters:

  • args (Array<Object>)

    The data to be written.

  • :timeout (Numeric)

    The total timeout to the whole write operation.

Returns:

  • (Integer)

    The length of bytes written to the socket.

Raises:

  • (ArgumentError)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 507

def write_with_timeout(*args, timeout:)
  raise ArgumentError, 'timeout cannot be nil' if timeout.nil?
  raise_timeout_error!("Negative timeout #{timeout} given to socket", true) if timeout < 0

  written = 0
  args.each do |buf|
    buf = buf.to_s
    i = 0
    while i < buf.length
      chunk = buf[i...(i + WRITE_CHUNK_SIZE)]
      written += write_chunk(chunk, timeout)
      i += WRITE_CHUNK_SIZE
    end
  end
  written
end

#write_without_timeout(*args) ⇒ Integer (private)

Writes data to to the socket.

Parameters:

  • args (Array<Object>)

    The data to be written.

Returns:

  • (Integer)

    The length of bytes written to the socket.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/socket.rb', line 477

def write_without_timeout(*args)
  # This method used to forward arguments to @socket.write in a
  # single call like so:
  #
  # @socket.write(*args)
  #
  # Turns out, when each buffer to be written is large (e.g. 32 MiB),
  # this write call would take an extremely long time (20+ seconds)
  # while using 100% CPU. Splitting the writes into chunks produced
  # massively better performance (0.05 seconds to write the 32 MiB of
  # data on the same hardware). Unfortunately splitting the data,
  # one would assume, results in it being copied, but this seems to be
  # a much more minor issue compared to CPU cost of writing large buffers.
  args.each do |buf|
    buf = buf.to_s
    i = 0
    while i < buf.length
      chunk = buf[i...i+WRITE_CHUNK_SIZE]
      @socket.write(chunk)
      i += WRITE_CHUNK_SIZE
    end
  end
end