Class: Mongo::Socket Private
Relationships & Source Files | |
Namespace Children | |
Modules:
| |
Classes:
| |
Extension / Inclusion / Inheritance Descendants | |
Subclasses:
|
|
Super Chains via Extension / Inclusion / Inheritance | |
Instance Chain:
self,
Socket::Constants
|
|
Inherits: | Object |
Defined in: | lib/mongo/socket.rb, lib/mongo/socket/ocsp_cache.rb, lib/mongo/socket/ocsp_verifier.rb, lib/mongo/socket/ssl.rb, lib/mongo/socket/tcp.rb, lib/mongo/socket/unix.rb |
Overview
Provides additional data around sockets for the driver’s use.
Constant Summary
-
DEFAULT_TCP_KEEPCNT =
9
-
DEFAULT_TCP_KEEPIDLE =
120
-
DEFAULT_TCP_KEEPINTVL =
10
-
DEFAULT_TCP_USER_TIMEOUT =
210
-
SSL_ERROR =
Deprecated.
Error
message for TLS related exceptions.'MongoDB may not be configured with TLS support'.freeze
-
TIMEOUT_ERROR =
Deprecated.
Error
message for timeouts on socket calls.'Socket request timed out'.freeze
-
TIMEOUT_PACK =
The pack directive for timeouts.
'l_2'.freeze
-
WRITE_CHUNK_SIZE =
Write data to the socket in chunks of this size.
65536
Class Method Summary
-
.new(timeout, options) ⇒ Socket
constructor
Internal use only
Initializes common socket attributes.
Instance Attribute Summary
-
#alive? ⇒ true, false
readonly
deprecated
Internal use only
Deprecated.
Use #connectable? on the connection instead.
- #connectable? ⇒ true readonly deprecated Internal use only Deprecated.
-
#eof? ⇒ Boolean
readonly
Internal use only
Tests if this socket has reached EOF.
- #family ⇒ Integer readonly Internal use only
- #monitor? ⇒ true | false readonly Internal use only
- #options ⇒ Hash readonly Internal use only
- #socket ⇒ Socket readonly Internal use only
- #timeout ⇒ Float readonly Internal use only
Instance Method Summary
-
#close ⇒ true
Internal use only
Close the socket.
- #connection_address ⇒ Address Internal use only
- #connection_generation ⇒ Integer Internal use only
-
#gets(*args) ⇒ Object
Internal use only
Delegates gets to the underlying socket.
-
#read(length, socket_timeout: nil, timeout: nil) ⇒ Object
Internal use only
Will read all data from the socket for the provided number of bytes.
-
#readbyte ⇒ Object
Internal use only
Read a single byte from the socket.
- #summary ⇒ String Internal use only
-
#write(*args, timeout: nil) ⇒ Integer
Internal use only
Writes data to the socket instance.
- #allocate_string(capacity) private Internal use only
-
#do_write(*args, timeout: nil) ⇒ Integer
private
Internal use only
Writes data to the socket instance.
- #human_address private Internal use only
- #map_exceptions private Internal use only
- #raise_timeout_error!(message = nil, csot = false) private Internal use only
- #read_buffer_size private Internal use only
-
#read_from_socket(length, socket_timeout: nil, csot: false) ⇒ Object
private
Internal use only
Reads the
length
bytes from the socket. -
#read_with_timeout(length, timeout) ⇒ Object
private
Internal use only
Reads the
length
bytes from the socket, the read operation duration is limited to #timeout second. -
#read_without_timeout(length, socket_timeout = nil) ⇒ Object
private
Internal use only
Reads the
length
bytes from the socket. - #set_keepalive_opts(sock) private Internal use only
- #set_option(sock, option, default) private Internal use only
- #set_socket_options(sock) private Internal use only
- #unix_socket?(sock) ⇒ Boolean private Internal use only
- #write_chunk(chunk, timeout) private Internal use only
-
#write_with_timeout(*args, timeout:) ⇒ Integer
private
Internal use only
Writes data to to the socket, the write duration is limited to #timeout.
-
#write_without_timeout(*args) ⇒ Integer
private
Internal use only
Writes data to to the socket.
Instance Attribute Details
#alive? ⇒ true
, false
(readonly)
Use #connectable? on the connection instead.
Is the socket connection alive?
# File 'lib/mongo/socket.rb', line 134
def alive? sock_arr = [ @socket ] if Kernel::select(sock_arr, nil, sock_arr, 0) # The eof? call is supposed to return immediately since select # indicated the socket is readable. However, if @socket is a TLS # socket, eof? can block anyway - see RUBY-2140. begin Timeout.timeout(0.1) do eof? end rescue ::Timeout::Error true end else true end end
#connectable? ⇒ true
(readonly)
For backwards compatibility only, do not use.
# File 'lib/mongo/socket.rb', line 260
def connectable? true end
#eof? ⇒ Boolean
(readonly)
Tests if this socket has reached EOF. Primarily used for liveness checks.
# File 'lib/mongo/socket.rb', line 249
def eof? @socket.eof? rescue IOError, SystemCallError true end
#family ⇒ Integer
(readonly)
# File 'lib/mongo/socket.rb', line 75
attr_reader :family
#monitor? ⇒ true
| false
(readonly)
# File 'lib/mongo/socket.rb', line 105
def monitor? !! [:monitor] end
#options ⇒ Hash
(readonly)
# File 'lib/mongo/socket.rb', line 81
attr_reader :
#socket ⇒ Socket
(readonly)
# File 'lib/mongo/socket.rb', line 78
attr_reader :socket
#timeout ⇒ Float
(readonly)
# File 'lib/mongo/socket.rb', line 84
attr_reader :timeout
Instance Method Details
#allocate_string(capacity) (private)
# File 'lib/mongo/socket.rb', line 444
def allocate_string(capacity) String.new('', :capacity => capacity, :encoding => 'BINARY') end
#close ⇒ true
Close the socket.
#connection_address ⇒ Address
# File 'lib/mongo/socket.rb', line 89
def connection_address [:connection_address] end
#connection_generation ⇒ Integer
# File 'lib/mongo/socket.rb', line 97
def connection_generation [:connection_generation] end
#do_write(*args, timeout: nil) ⇒ Integer
(private)
Writes data to the socket instance.
This is a separate method from #write for ease of mocking in the tests. This method should not perform any exception mapping, upstream code sholud map exceptions.
# File 'lib/mongo/socket.rb', line 464
def do_write(*args, timeout: nil) if timeout.nil? write_without_timeout(*args) else write_with_timeout(*args, timeout: timeout) end end
#gets(*args) ⇒ Object
Delegates gets to the underlying socket.
# File 'lib/mongo/socket.rb', line 182
def gets(*args) map_exceptions do @socket.gets(*args) end end
#human_address (private)
# File 'lib/mongo/socket.rb', line 603
def human_address raise NotImplementedError end
#map_exceptions (private)
# File 'lib/mongo/socket.rb', line 591
def map_exceptions begin yield rescue Errno::ETIMEDOUT => e raise Error::SocketTimeoutError, "#{e.class}: #{e} (for #{human_address})" rescue IOError, SystemCallError => e raise Error::SocketError, "#{e.class}: #{e} (for #{human_address})" rescue OpenSSL::SSL::SSLError => e raise Error::SocketError, "#{e.class}: #{e} (for #{human_address})" end end
#raise_timeout_error!(message = nil, csot = false) (private)
# File 'lib/mongo/socket.rb', line 607
def raise_timeout_error!( = nil, csot = false) if csot raise Mongo::Error::TimeoutError else raise Errno::ETIMEDOUT, end end
#read(length, socket_timeout: nil, timeout: nil) ⇒ Object
Will read all data from the socket for the provided number of bytes. If no data is returned, an exception will be raised.
# File 'lib/mongo/socket.rb', line 205
def read(length, socket_timeout: nil, timeout: nil) if !socket_timeout.nil? && !timeout.nil? raise ArgumentError, 'Both timeout and socket_timeout cannot be set' end if !socket_timeout.nil? || timeout.nil? read_without_timeout(length, socket_timeout) else read_with_timeout(length, timeout) end end
#read_buffer_size (private)
# File 'lib/mongo/socket.rb', line 448
def read_buffer_size # Buffer size for non-TLS reads # 64kb 65536 end
#read_from_socket(length, socket_timeout: nil, csot: false) ⇒ Object
(private)
Reads the length
bytes from the socket. The read operation may involve multiple socket reads, each read is limited to #timeout second, if the parameter is provided.
# File 'lib/mongo/socket.rb', line 324
def read_from_socket(length, socket_timeout: nil, csot: false) # Just in case if length == 0 return ''.force_encoding('BINARY') end _timeout = socket_timeout || self.timeout if _timeout if _timeout > 0 deadline = Utils.monotonic_time + _timeout elsif _timeout < 0 raise_timeout_error!("Negative timeout #{_timeout} given to socket", csot) end end # We want to have a fixed and reasonably small size buffer for reads # because, for example, OpenSSL reads in 16 kb chunks max. # Having a 16 mb buffer means there will be 1000 reads each allocating # 16 mb of memory and using 16 kb of it. buf_size = read_buffer_size data = nil # If we want to read less than the buffer size, just allocate the # memory that is necessary if length < buf_size buf_size = length end # The binary encoding is important, otherwise Ruby performs encoding # conversions of some sort during the write into the buffer which # kills performance buf = allocate_string(buf_size) retrieved = 0 begin while retrieved < length retrieve = length - retrieved if retrieve > buf_size retrieve = buf_size end chunk = @socket.read_nonblock(retrieve, buf) # If we read the entire wanted length in one operation, # return the data as is which saves one memory allocation and # one copy per read if retrieved == 0 && chunk.length == length return chunk end # If we are here, we are reading the wanted length in # multiple operations. Allocate the total buffer here rather # than up front so that the special case above won't be # allocating twice if data.nil? data = allocate_string(length) end # ... and we need to copy the chunks at this point data[retrieved, chunk.length] = chunk retrieved += chunk.length end # As explained in https://ruby-doc.com/core-trunk/IO.html#method-c-select, # reading from a TLS socket may require writing which may raise WaitWritable rescue IO::WaitReadable, IO::WaitWritable => exc if deadline select_timeout = deadline - Utils.monotonic_time if select_timeout <= 0 raise_timeout_error!("Took more than #{_timeout} seconds to receive data", csot) end end pipe = [:pipe] if exc.is_a?(IO::WaitReadable) if pipe select_args = [[@socket, pipe], nil, [@socket, pipe], select_timeout] else select_args = [[@socket], nil, [@socket], select_timeout] end else select_args = [nil, [@socket], [@socket], select_timeout] end rv = Kernel.select(*select_args) if Lint.enabled? if pipe && rv&.include?(pipe) # If the return value of select is the read end of the pipe, and # an IOError is not raised, then that means the socket is still # open. Select is interrupted be closing the write end of the # pipe, which either returns the pipe if the socket is open, or # raises an IOError if it isn't. Select is interrupted after all # of the pending and checked out connections have been interrupted # and closed, and this only happens once the pool is cleared with # interrupt_in_use connections flag. This means that in order for # the socket to still be open when the select is interrupted, and # that socket is being read from, that means after clear was # called, a connection from the previous generation was checked # out of the pool, for reading on its socket. This should be impossible. raise Mongo::LintError, "Select interrupted for live socket. This should be impossible." end end if BSON::Environment.jruby? # Ignore the return value of Kernel.select. # On JRuby, select appears to return nil prior to timeout expiration # (apparently due to a EAGAIN) which then causes us to fail the read # even though we could have retried it. # Check the deadline ourselves. if deadline select_timeout = deadline - Utils.monotonic_time if select_timeout <= 0 raise_timeout_error!("Took more than #{_timeout} seconds to receive data", csot) end end elsif rv.nil? raise_timeout_error!("Took more than #{_timeout} seconds to receive data (select call timed out)", csot) end retry end data end
#read_with_timeout(length, timeout) ⇒ Object
(private)
Reads the length
bytes from the socket, the read operation duration is limited to #timeout second.
# File 'lib/mongo/socket.rb', line 273
def read_with_timeout(length, timeout) deadline = Utils.monotonic_time + timeout map_exceptions do String.new.tap do |data| while data.length < length socket_timeout = deadline - Utils.monotonic_time if socket_timeout <= 0 raise Mongo::Error::TimeoutError end chunk = read_from_socket(length - data.length, socket_timeout: socket_timeout, csot: true) unless chunk.length > 0 raise IOError, "Expected to read > 0 bytes but read 0 bytes" end data << chunk end end end end
#read_without_timeout(length, socket_timeout = nil) ⇒ Object
(private)
Reads the length
bytes from the socket. The read operation may involve multiple socket reads, each read is limited to #timeout second, if the parameter is provided.
# File 'lib/mongo/socket.rb', line 300
def read_without_timeout(length, socket_timeout = nil) map_exceptions do String.new.tap do |data| while data.length < length chunk = read_from_socket(length - data.length, socket_timeout: socket_timeout) unless chunk.length > 0 raise IOError, "Expected to read > 0 bytes but read 0 bytes" end data << chunk end end end end
#readbyte ⇒ Object
Read a single byte from the socket.
# File 'lib/mongo/socket.rb', line 224
def readbyte map_exceptions do @socket.readbyte end end
#set_keepalive_opts(sock) (private)
# File 'lib/mongo/socket.rb', line 564
def set_keepalive_opts(sock) sock.setsockopt(SOL_SOCKET, SO_KEEPALIVE, true) set_option(sock, :TCP_KEEPINTVL, DEFAULT_TCP_KEEPINTVL) set_option(sock, :TCP_KEEPCNT, DEFAULT_TCP_KEEPCNT) set_option(sock, :TCP_KEEPIDLE, DEFAULT_TCP_KEEPIDLE) set_option(sock, :TCP_USER_TIMEOUT, DEFAULT_TCP_USER_TIMEOUT) rescue # JRuby 9.2.13.0 and lower do not define TCP_KEEPINTVL etc. constants. # JRuby 9.2.14.0 defines the constants but does not allow to get or # set them with this error: # Errno::ENOPROTOOPT: Protocol not available - Protocol not available end
#set_option(sock, option, default) (private)
# File 'lib/mongo/socket.rb', line 577
def set_option(sock, option, default) if Socket.const_defined?(option) system_default = sock.getsockopt(IPPROTO_TCP, option).int if system_default > default sock.setsockopt(IPPROTO_TCP, option, default) end end end
#set_socket_options(sock) (private)
# File 'lib/mongo/socket.rb', line 586
def (sock) sock.set_encoding(BSON::BINARY) set_keepalive_opts(sock) end
#summary ⇒ String
# File 'lib/mongo/socket.rb', line 112
def summary fileno = @socket&.fileno rescue '<no socket>' || '<no socket>' if monitor? indicator = if [:push] 'pm' else 'm' end "#{connection_address};#{indicator};fd=#{fileno}" else "#{connection_address};c:#{connection_generation};fd=#{fileno}" end end
#unix_socket?(sock) ⇒ Boolean
(private)
# File 'lib/mongo/socket.rb', line 552
def unix_socket?(sock) defined?(UNIXSocket) && sock.is_a?(UNIXSocket) end
#write(*args, timeout: nil) ⇒ Integer
Writes data to the socket instance.
# File 'lib/mongo/socket.rb', line 240
def write(*args, timeout: nil) map_exceptions do do_write(*args, timeout: timeout) end end
#write_chunk(chunk, timeout) (private)
# File 'lib/mongo/socket.rb', line 524
def write_chunk(chunk, timeout) deadline = Utils.monotonic_time + timeout written = 0 begin written += @socket.write_nonblock(chunk[written..-1]) rescue IO::WaitWritable, Errno::EINTR select_timeout = deadline - Utils.monotonic_time rv = Kernel.select(nil, [@socket], nil, select_timeout) if BSON::Environment.jruby? # Ignore the return value of Kernel.select. # On JRuby, select appears to return nil prior to timeout expiration # (apparently due to a EAGAIN) which then causes us to fail the read # even though we could have retried it. # Check the deadline ourselves. if deadline select_timeout = deadline - Utils.monotonic_time if select_timeout <= 0 raise_timeout_error!("Took more than #{timeout} seconds to receive data", true) end end elsif rv.nil? raise_timeout_error!("Took more than #{timeout} seconds to receive data (select call timed out)", true) end retry end written end
#write_with_timeout(*args, timeout:) ⇒ Integer
(private)
Writes data to to the socket, the write duration is limited to #timeout.
# File 'lib/mongo/socket.rb', line 507
def write_with_timeout(*args, timeout:) raise ArgumentError, 'timeout cannot be nil' if timeout.nil? raise_timeout_error!("Negative timeout #{timeout} given to socket", true) if timeout < 0 written = 0 args.each do |buf| buf = buf.to_s i = 0 while i < buf.length chunk = buf[i...(i + WRITE_CHUNK_SIZE)] written += write_chunk(chunk, timeout) i += WRITE_CHUNK_SIZE end end written end
#write_without_timeout(*args) ⇒ Integer
(private)
Writes data to to the socket.
# File 'lib/mongo/socket.rb', line 477
def write_without_timeout(*args) # This method used to forward arguments to @socket.write in a # single call like so: # # @socket.write(*args) # # Turns out, when each buffer to be written is large (e.g. 32 MiB), # this write call would take an extremely long time (20+ seconds) # while using 100% CPU. Splitting the writes into chunks produced # massively better performance (0.05 seconds to write the 32 MiB of # data on the same hardware). Unfortunately splitting the data, # one would assume, results in it being copied, but this seems to be # a much more minor issue compared to CPU cost of writing large buffers. args.each do |buf| buf = buf.to_s i = 0 while i < buf.length chunk = buf[i...i+WRITE_CHUNK_SIZE] @socket.write(chunk) i += WRITE_CHUNK_SIZE end end end