Class: Mongo::Socket Private
| Relationships & Source Files | |
| Namespace Children | |
|
Modules:
| |
|
Classes:
| |
| Extension / Inclusion / Inheritance Descendants | |
|
Subclasses:
|
|
| Super Chains via Extension / Inclusion / Inheritance | |
|
Instance Chain:
self,
Socket::Constants
|
|
| Inherits: | Object |
| Defined in: | lib/mongo/socket.rb, lib/mongo/socket/ocsp_cache.rb, lib/mongo/socket/ocsp_verifier.rb, lib/mongo/socket/ssl.rb, lib/mongo/socket/tcp.rb, lib/mongo/socket/unix.rb |
Overview
Provides additional data around sockets for the driver’s use.
Constant Summary
-
DEFAULT_TCP_KEEPCNT =
# File 'lib/mongo/socket.rb', line 556
9 -
DEFAULT_TCP_KEEPIDLE =
# File 'lib/mongo/socket.rb', line 558
120 -
DEFAULT_TCP_KEEPINTVL =
# File 'lib/mongo/socket.rb', line 554
10 -
DEFAULT_TCP_USER_TIMEOUT =
# File 'lib/mongo/socket.rb', line 560
210 -
SSL_ERROR =
# File 'lib/mongo/socket.rb', line 35Deprecated.
Errormessage for TLS related exceptions.'MongoDB may not be configured with TLS support' -
TIMEOUT_ERROR =
# File 'lib/mongo/socket.rb', line 41Deprecated.
Errormessage for timeouts on socket calls.'Socket request timed out' -
TIMEOUT_PACK =
# File 'lib/mongo/socket.rb', line 46
The pack directive for timeouts.
'l_2' -
WRITE_CHUNK_SIZE =
# File 'lib/mongo/socket.rb', line 51
Write data to the socket in chunks of this size.
65_536
Class Method Summary
-
.new(timeout, options) ⇒ Socket
constructor
Internal use only
Initializes common socket attributes.
Instance Attribute Summary
-
#alive? ⇒ true, false
readonly
deprecated
Internal use only
Deprecated.
Use #connectable? on the connection instead.
- #connectable? ⇒ true readonly deprecated Internal use only Deprecated.
-
#eof? ⇒ Boolean
readonly
Internal use only
Tests if this socket has reached EOF.
- #family ⇒ Integer readonly Internal use only
- #monitor? ⇒ true | false readonly Internal use only
- #options ⇒ Hash readonly Internal use only
- #socket ⇒ Socket readonly Internal use only
- #timeout ⇒ Float readonly Internal use only
Instance Method Summary
-
#close ⇒ true
Internal use only
Close the socket.
- #connection_address ⇒ Address Internal use only
- #connection_generation ⇒ Integer Internal use only
-
#gets(*args) ⇒ Object
Internal use only
Delegates gets to the underlying socket.
-
#pipe ⇒ IO
Internal use only
listen on during the select system call when reading from the socket.
-
#read(length, socket_timeout: nil, timeout: nil) ⇒ Object
Internal use only
Will read all data from the socket for the provided number of bytes.
-
#readbyte ⇒ Object
Internal use only
Read a single byte from the socket.
- #summary ⇒ String Internal use only
-
#write(*args, timeout: nil) ⇒ Integer
Internal use only
Writes data to the socket instance.
- #allocate_string(capacity) private Internal use only
-
#do_write(*args, timeout: nil) ⇒ Integer
private
Internal use only
Writes data to the socket instance.
- #human_address private Internal use only
- #map_exceptions private Internal use only
- #raise_timeout_error!(message = nil, csot = false) private Internal use only
- #read_buffer_size private Internal use only
-
#read_from_socket(length, socket_timeout: nil, csot: false) ⇒ Object
private
Internal use only
Reads the
lengthbytes from the socket. -
#read_with_timeout(length, timeout) ⇒ Object
private
Internal use only
Reads the
lengthbytes from the socket, the read operation duration is limited to #timeout second. -
#read_without_timeout(length, socket_timeout = nil) ⇒ Object
private
Internal use only
Reads the
lengthbytes from the socket. - #set_keepalive_opts(sock) private Internal use only
- #set_option(sock, option, default) private Internal use only
- #set_socket_options(sock) private Internal use only
- #unix_socket?(sock) ⇒ Boolean private Internal use only
- #wait_for_socket_to_be_writable(deadline) private Internal use only
- #write_chunk(chunk, timeout) private Internal use only
-
#write_with_timeout(*args, timeout:) ⇒ Integer
private
Internal use only
Writes data to the socket, the write duration is limited to #timeout.
-
#write_without_timeout(*args) ⇒ Integer
private
Internal use only
Writes data to the socket.
Instance Attribute Details
#alive? ⇒ true, false (readonly)
Use #connectable? on the connection instead.
Is the socket connection alive?
# File 'lib/mongo/socket.rb', line 146
def alive? sock_arr = [ @socket ] if Kernel.select(sock_arr, nil, sock_arr, 0) # The eof? call is supposed to return immediately since select # indicated the socket is readable. However, if @socket is a TLS # socket, eof? can block anyway - see RUBY-2140. begin Timeout.timeout(0.1) do eof? end rescue ::Timeout::Error true end else true end end
#connectable? ⇒ true (readonly)
For backwards compatibility only, do not use.
# File 'lib/mongo/socket.rb', line 271
def connectable? true end
#eof? ⇒ Boolean (readonly)
Tests if this socket has reached EOF. Primarily used for liveness checks.
# File 'lib/mongo/socket.rb', line 260
def eof? @socket.eof? rescue IOError, SystemCallError true end
#family ⇒ Integer (readonly)
# File 'lib/mongo/socket.rb', line 76
attr_reader :family
#monitor? ⇒ true | false (readonly)
# File 'lib/mongo/socket.rb', line 106
def monitor? !![:monitor] end
#options ⇒ Hash (readonly)
# File 'lib/mongo/socket.rb', line 82
attr_reader :
#socket ⇒ Socket (readonly)
# File 'lib/mongo/socket.rb', line 79
attr_reader :socket
#timeout ⇒ Float (readonly)
# File 'lib/mongo/socket.rb', line 85
attr_reader :timeout
Instance Method Details
#allocate_string(capacity) (private)
# File 'lib/mongo/socket.rb', line 434
def allocate_string(capacity) String.new('', capacity: capacity, encoding: 'BINARY') end
#close ⇒ true
Close the socket.
#connection_address ⇒ Address
# File 'lib/mongo/socket.rb', line 90
def connection_address [:connection_address] end
#connection_generation ⇒ Integer
# File 'lib/mongo/socket.rb', line 98
def connection_generation [:connection_generation] end
#do_write(*args, timeout: nil) ⇒ Integer (private)
Writes data to the socket instance.
This is a separate method from #write for ease of mocking in the tests. This method should not perform any exception mapping, upstream code should map exceptions.
# File 'lib/mongo/socket.rb', line 454
def do_write(*args, timeout: nil) if timeout.nil? write_without_timeout(*args) else write_with_timeout(*args, timeout: timeout) end end
#gets(*args) ⇒ Object
Delegates gets to the underlying socket.
# File 'lib/mongo/socket.rb', line 194
def gets(*args) map_exceptions do @socket.gets(*args) end end
#human_address (private)
# File 'lib/mongo/socket.rb', line 599
def human_address raise NotImplementedError end
#map_exceptions (private)
# File 'lib/mongo/socket.rb', line 589
def map_exceptions yield rescue Errno::ETIMEDOUT => e raise Error::SocketTimeoutError, "#{e.class}: #{e} (for #{human_address})" rescue IOError, SystemCallError, ::SocketError => e raise Error::SocketError, "#{e.class}: #{e} (for #{human_address})" rescue OpenSSL::SSL::SSLError => e raise Error::SocketError, "#{e.class}: #{e} (for #{human_address})" end
#pipe ⇒ IO
listen on during the select system call when reading from the socket.
# File 'lib/mongo/socket.rb', line 113
def pipe [:pipe] end
#raise_timeout_error!(message = nil, csot = false) (private)
# File 'lib/mongo/socket.rb', line 603
def raise_timeout_error!( = nil, csot = false) raise Mongo::Error::TimeoutError if csot raise Errno::ETIMEDOUT, end
#read(length, socket_timeout: nil, timeout: nil) ⇒ Object
Will read all data from the socket for the provided number of bytes. If no data is returned, an exception will be raised.
# File 'lib/mongo/socket.rb', line 217
def read(length, socket_timeout: nil, timeout: nil) raise ArgumentError, 'Both timeout and socket_timeout cannot be set' if !socket_timeout.nil? && !timeout.nil? if !socket_timeout.nil? || timeout.nil? read_without_timeout(length, socket_timeout) else read_with_timeout(length, timeout) end end
#read_buffer_size (private)
# File 'lib/mongo/socket.rb', line 438
def read_buffer_size # Buffer size for non-TLS reads # 64kb 65_536 end
#read_from_socket(length, socket_timeout: nil, csot: false) ⇒ Object (private)
Reads the length bytes from the socket. The read operation may involve multiple socket reads, each read is limited to #timeout second, if the parameter is provided.
# File 'lib/mongo/socket.rb', line 331
def read_from_socket(length, socket_timeout: nil, csot: false) # Just in case return ''.force_encoding('BINARY') if length == 0 _timeout = socket_timeout || timeout if _timeout if _timeout > 0 deadline = Utils.monotonic_time + _timeout elsif _timeout < 0 raise_timeout_error!("Negative timeout #{_timeout} given to socket", csot) end end # We want to have a fixed and reasonably small size buffer for reads # because, for example, OpenSSL reads in 16 kb chunks max. # Having a 16 mb buffer means there will be 1000 reads each allocating # 16 mb of memory and using 16 kb of it. buf_size = read_buffer_size data = nil # If we want to read less than the buffer size, just allocate the # memory that is necessary buf_size = length if length < buf_size # The binary encoding is important, otherwise Ruby performs encoding # conversions of some sort during the write into the buffer which # kills performance buf = allocate_string(buf_size) retrieved = 0 begin while retrieved < length retrieve = length - retrieved retrieve = buf_size if retrieve > buf_size chunk = @socket.read_nonblock(retrieve, buf) # If we read the entire wanted length in one operation, # return the data as is which saves one memory allocation and # one copy per read return chunk if retrieved == 0 && chunk.length == length # If we are here, we are reading the wanted length in # multiple operations. Allocate the total buffer here rather # than up front so that the special case above won't be # allocating twice data = allocate_string(length) if data.nil? # ... and we need to copy the chunks at this point data[retrieved, chunk.length] = chunk retrieved += chunk.length end # As explained in https://ruby-doc.com/core-trunk/IO.html#method-c-select, # reading from a TLS socket may require writing which may raise WaitWritable rescue IO::WaitReadable, IO::WaitWritable => e if deadline select_timeout = deadline - Utils.monotonic_time raise_timeout_error!("Took more than #{_timeout} seconds to receive data", csot) if select_timeout <= 0 end select_args = if e.is_a?(IO::WaitReadable) if pipe [ [ @socket, pipe ], nil, [ @socket, pipe ], select_timeout ] else [ [ @socket ], nil, [ @socket ], select_timeout ] end else [ nil, [ @socket ], [ @socket ], select_timeout ] end rv = Kernel.select(*select_args) if Lint.enabled? && pipe && rv&.include?(pipe) # If the return value of select is the read end of the pipe, and # an IOError is not raised, then that means the socket is still # open. Select is interrupted be closing the write end of the # pipe, which either returns the pipe if the socket is open, or # raises an IOError if it isn't. Select is interrupted after all # of the pending and checked out connections have been interrupted # and closed, and this only happens once the pool is cleared with # interrupt_in_use connections flag. This means that in order for # the socket to still be open when the select is interrupted, and # that socket is being read from, that means after clear was # called, a connection from the previous generation was checked # out of the pool, for reading on its socket. This should be impossible. raise Mongo::LintError, 'Select interrupted for live socket. This should be impossible.' end if BSON::Environment.jruby? # Ignore the return value of Kernel.select. # On JRuby, select appears to return nil prior to timeout expiration # (apparently due to a EAGAIN) which then causes us to fail the read # even though we could have retried it. # Check the deadline ourselves. if deadline select_timeout = deadline - Utils.monotonic_time raise_timeout_error!("Took more than #{_timeout} seconds to receive data", csot) if select_timeout <= 0 end elsif rv.nil? raise_timeout_error!("Took more than #{_timeout} seconds to receive data (select call timed out)", csot) end retry end data end
#read_with_timeout(length, timeout) ⇒ Object (private)
Reads the length bytes from the socket, the read operation duration is limited to #timeout second.
# File 'lib/mongo/socket.rb', line 284
def read_with_timeout(length, timeout) deadline = Utils.monotonic_time + timeout map_exceptions do String.new.tap do |data| while data.length < length socket_timeout = deadline - Utils.monotonic_time raise Mongo::Error::TimeoutError if socket_timeout <= 0 chunk = read_from_socket(length - data.length, socket_timeout: socket_timeout, csot: true) raise IOError, 'Expected to read > 0 bytes but read 0 bytes' unless chunk.length > 0 data << chunk end end end end
#read_without_timeout(length, socket_timeout = nil) ⇒ Object (private)
Reads the length bytes from the socket. The read operation may involve multiple socket reads, each read is limited to #timeout second, if the parameter is provided.
# File 'lib/mongo/socket.rb', line 309
def read_without_timeout(length, socket_timeout = nil) map_exceptions do String.new.tap do |data| while data.length < length chunk = read_from_socket(length - data.length, socket_timeout: socket_timeout) raise IOError, 'Expected to read > 0 bytes but read 0 bytes' unless chunk.length > 0 data << chunk end end end end
#readbyte ⇒ Object
Read a single byte from the socket.
# File 'lib/mongo/socket.rb', line 235
def readbyte map_exceptions do @socket.readbyte end end
#set_keepalive_opts(sock) (private)
# File 'lib/mongo/socket.rb', line 562
def set_keepalive_opts(sock) sock.setsockopt(SOL_SOCKET, SO_KEEPALIVE, true) set_option(sock, :TCP_KEEPINTVL, DEFAULT_TCP_KEEPINTVL) set_option(sock, :TCP_KEEPCNT, DEFAULT_TCP_KEEPCNT) set_option(sock, :TCP_KEEPIDLE, DEFAULT_TCP_KEEPIDLE) set_option(sock, :TCP_USER_TIMEOUT, DEFAULT_TCP_USER_TIMEOUT) rescue StandardError # JRuby 9.2.13.0 and lower do not define TCP_KEEPINTVL etc. constants. # JRuby 9.2.14.0 defines the constants but does not allow to get or # set them with this error: # Errno::ENOPROTOOPT: Protocol not available - Protocol not available end
#set_option(sock, option, default) (private)
# File 'lib/mongo/socket.rb', line 575
def set_option(sock, option, default) return unless Socket.const_defined?(option) system_default = sock.getsockopt(IPPROTO_TCP, option).int return unless system_default > default sock.setsockopt(IPPROTO_TCP, option, default) end
#set_socket_options(sock) (private)
# File 'lib/mongo/socket.rb', line 584
def (sock) sock.set_encoding(BSON::BINARY) set_keepalive_opts(sock) end
#summary ⇒ String
# File 'lib/mongo/socket.rb', line 120
def summary fileno = begin @socket&.fileno rescue StandardError '<no socket>' end if monitor? indicator = if [:push] 'pm' else 'm' end "#{connection_address};#{indicator};fd=#{fileno}" else "#{connection_address};c:#{connection_generation};fd=#{fileno}" end end
#unix_socket?(sock) ⇒ Boolean (private)
# File 'lib/mongo/socket.rb', line 550
def unix_socket?(sock) defined?(UNIXSocket) && sock.is_a?(UNIXSocket) end
#wait_for_socket_to_be_writable(deadline) (private)
# File 'lib/mongo/socket.rb', line 533
def wait_for_socket_to_be_writable(deadline) select_timeout = deadline - Utils.monotonic_time rv = Kernel.select(nil, [ @socket ], nil, select_timeout) if BSON::Environment.jruby? # Ignore the return value of Kernel.select. # On JRuby, select appears to return nil prior to timeout expiration # (apparently due to a EAGAIN) which then causes us to fail the read # even though we could have retried it. # Check the deadline ourselves. select_timeout = deadline - Utils.monotonic_time return select_timeout > 0 end !rv.nil? end
#write(*args, timeout: nil) ⇒ Integer
Writes data to the socket instance.
# File 'lib/mongo/socket.rb', line 251
def write(*args, timeout: nil) map_exceptions do do_write(*args, timeout: timeout) end end
#write_chunk(chunk, timeout) (private)
# File 'lib/mongo/socket.rb', line 514
def write_chunk(chunk, timeout) deadline = Utils.monotonic_time + timeout written = 0 while written < chunk.length begin written += @socket.write_nonblock(chunk[written..-1]) rescue IO::WaitWritable, Errno::EINTR unless wait_for_socket_to_be_writable(deadline) raise_timeout_error!("Took more than #{timeout} seconds to receive data", true) end retry end end written end
#write_with_timeout(*args, timeout:) ⇒ Integer (private)
Writes data to the socket, the write duration is limited to #timeout.
# File 'lib/mongo/socket.rb', line 496
def write_with_timeout(*args, timeout:) raise ArgumentError, 'timeout cannot be nil' if timeout.nil? raise_timeout_error!("Negative timeout #{timeout} given to socket", true) if timeout < 0 written = 0 args.each do |buf| buf = buf.to_s i = 0 while i < buf.length chunk = buf[i...(i + WRITE_CHUNK_SIZE)] written += write_chunk(chunk, timeout) i += WRITE_CHUNK_SIZE end end written end
#write_without_timeout(*args) ⇒ Integer (private)
Writes data to the socket.
# File 'lib/mongo/socket.rb', line 467
def write_without_timeout(*args) # This method used to forward arguments to @socket.write in a # single call like so: # # @socket.write(*args) # # Turns out, when each buffer to be written is large (e.g. 32 MiB), # this write call would take an extremely long time (20+ seconds) # while using 100% CPU. Splitting the writes into chunks produced # massively better performance (0.05 seconds to write the 32 MiB of # data on the same hardware). Unfortunately splitting the data, # one would assume, results in it being copied, but this seems to be # a much more minor issue compared to CPU cost of writing large buffers. args.each do |buf| buf = buf.to_s i = 0 while i < buf.length chunk = buf[i, WRITE_CHUNK_SIZE] i += @socket.write(chunk) end end end