Class: Mongo::Server::ConnectionPool
Relationships & Source Files | |
Namespace Children | |
Classes:
| |
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
self,
Forwardable
|
|
Instance Chain:
|
|
Inherits: | Object |
Defined in: | lib/mongo/server/connection_pool.rb, lib/mongo/server/connection_pool/generation_manager.rb, lib/mongo/server/connection_pool/populator.rb |
Overview
Represents a connection pool for server connections.
Constant Summary
-
DEFAULT_MAX_CONNECTING =
The default maximum number of connections that can be connecting at any given time.
2
-
DEFAULT_MAX_SIZE =
The default max size for the connection pool.
20
-
DEFAULT_MIN_SIZE =
The default min size for the connection pool.
0
-
DEFAULT_WAIT_TIMEOUT =
The default timeout, in seconds, to wait for a connection.
This timeout applies while in flow threads are waiting for background threads to establish connections (and hence they must connect, handshake and auth in the allotted time).
It is currently set to 10 seconds. The default connect timeout is 10 seconds by itself, but setting large timeouts can get applications in trouble if their requests get timed out by the reverse proxy, thus anything over 15 seconds is potentially dangerous.
10.freeze
::Mongo::Loggable
- Included
Class Method Summary
-
.finalize(available_connections, pending_connections, populator) ⇒ Proc
Finalize the connection pool for garbage collection.
-
.new(server, options = {}) ⇒ ConnectionPool
constructor
Create the new connection pool.
Instance Attribute Summary
-
#closed? ⇒ true | false
readonly
Whether the pool has been closed.
- #generation_manager ⇒ Integer readonly Internal use only Internal use only
- #max_connecting readonly Internal use only Internal use only
- #options ⇒ Hash readonly
-
#paused? ⇒ true | false
readonly
A connection pool is paused if it is not closed and it is not ready.
-
#populate_semaphore
readonly
Condition variable broadcast when the size of the pool changes to wake up the populator.
- #populator readonly Internal use only Internal use only
-
#ready? ⇒ true | false
readonly
Whether the pool is ready.
- #server readonly Internal use only Internal use only
::Mongo::Monitoring::Publishable
- Included
Instance Method Summary
-
#available_count ⇒ Integer
Number of available connections in the pool.
-
#check_in(connection)
Check a connection back into the pool.
-
#check_out(connection_global_id: nil, context: nil) ⇒ Mongo::Server::Connection
Checks a connection out of the pool.
-
#clear(options = nil) ⇒ true
Closes all idle connections in the pool and schedules currently checked out connections to be closed when they are checked back into the pool.
-
#close(options = nil) ⇒ true
Marks the pool closed, closes all idle connections in the pool and schedules currently checked out connections to be closed when they are checked back into the pool.
-
#close_idle_sockets
Close sockets that have been open for longer than the max idle time,.
-
#disconnect!(options = nil)
Internal use only
Internal use only
Disconnects the pool.
-
#do_check_in(connection)
Executes the check in after having already acquired the lock.
- #do_clear(options = nil)
-
#do_pause
Internal use only
Internal use only
Mark the connection pool as paused without acquiring the lock.
-
#inspect ⇒ String
Get a pretty printed string inspection for the pool.
-
#max_idle_time ⇒ Float | nil
The maximum seconds a socket can remain idle since it has been checked in to the pool, if set.
-
#max_size ⇒ Integer
Get the maximum size of the connection pool.
-
#min_size ⇒ Integer
Get the minimum size of the connection pool.
-
#pause
Mark the connection pool as paused.
-
#populate ⇒ true | false
Internal use only
Internal use only
This method does three things: 1.
-
#ready
readonly
Instructs the pool to create and return connections.
-
#size ⇒ Integer
Size of the connection pool.
-
#stop_populator
Internal use only
Internal use only
Stop the background populator thread and clean up any connections created which have not been connected yet.
- #summary
- #unavailable_connections ⇒ Integer Internal use only Internal use only
-
#wait_timeout(context = nil) ⇒ Float
The time to wait, in seconds, for a connection to become available.
-
#with_connection(connection_global_id: nil, context: nil) ⇒ Object
Yield the block to a connection, while handling check in/check out logic.
- #check_invariants private
-
#clear_pending_connections
private
Clear and disconnect the pending connections.
-
#close_available_connections(service_id)
private
Close the available connections.
-
#connect_connection(connection, context = nil)
private
Attempts to connect (handshake and auth) the connection.
-
#connect_or_raise(connection, context)
private
Connects a connection and raises an exception if the connection cannot be connected.
-
#connection_stale_unlocked?(connection) ⇒ true | false
private
Checks whether a connection is stale.
-
#create_and_add_connection ⇒ true | false
private
Create a connection, connect it, and add it to the pool.
- #create_connection private
-
#decrement_connection_requests_and_signal
private
Decrement connection requests counter and signal the condition variables that the number of unavailable connections has decreased.
-
#get_connection(pid, connection_global_id) ⇒ Mongo::Server::Connection
private
Retrieves a connection if one is available, otherwise we create a new one.
-
#maybe_raise_pool_cleared!(connection, e)
private
If the connection was interrupted, raise a pool cleared error.
-
#next_available_connection(connection_global_id)
private
Returns the next available connection, optionally with given global id.
-
#raise_check_out_timeout!(connection_global_id)
private
The lock should be acquired when calling this method.
- #raise_check_out_timeout_locked!(connection_global_id) private
-
#raise_if_closed!
private
Asserts that the pool has not been closed.
-
#raise_if_not_ready!
private
The lock should be acquired when calling this method.
- #raise_if_pool_closed! private
- #raise_if_pool_paused! private
- #raise_if_pool_paused_locked! private
- #raise_unless_locked! private
-
#remove_interrupted_connections
private
Interrupt connections scheduled for interruption.
-
#remove_stale_connection
private
Removes and disconnects all stale available connections.
-
#retrieve_and_connect_connection(connection_global_id, context = nil) ⇒ Mongo::Server::Connection
private
Retrieves a connection and connects it.
-
#schedule_for_interruption(connections, service_id)
private
Schedule connections of previous generations for interruption.
-
#unsynchronized_size
private
Returns the size of the connection pool without acquiring the lock.
- #valid_available_connection?(connection, pid, connection_global_id) ⇒ Boolean private
-
#wait_for_connection(connection_global_id, deadline) ⇒ Mongo::Server::Connection
private
Waits for a connection to become available, or raises is no connection becomes available before the timeout.
::Mongo::Monitoring::Publishable
- Included
#publish_cmap_event, #publish_event, #publish_sdam_event, #command_completed, #command_failed, #command_started, #command_succeeded, #duration |
::Mongo::Loggable
- Included
#log_debug | Convenience method to log debug messages with the standard prefix. |
#log_error | Convenience method to log error messages with the standard prefix. |
#log_fatal | Convenience method to log fatal messages with the standard prefix. |
#log_info | Convenience method to log info messages with the standard prefix. |
#log_warn | Convenience method to log warn messages with the standard prefix. |
#logger | Get the logger instance. |
#_mongo_log_prefix, #format_message |
Constructor Details
.new(server, options = {}) ⇒ ConnectionPool
Create the new connection pool.
Note: Additionally, options for connections created by this pool should
be included in the passed here, and they will be forwarded to
any connections created by the pool.
# File 'lib/mongo/server/connection_pool.rb', line 102
def initialize(server, = {}) unless server.is_a?(Server) raise ArgumentError, 'First argument must be a Server instance' end = .dup if [:min_size] && [:min_pool_size] && [:min_size] != [:min_pool_size] raise ArgumentError, "Min size #{ [:min_size]} is not identical to min pool size #{ [:min_pool_size]}" end if [:max_size] && [:max_pool_size] && [:max_size] != [:max_pool_size] raise ArgumentError, "Max size #{ [:max_size]} is not identical to max pool size #{ [:max_pool_size]}" end if [:wait_timeout] && [:wait_queue_timeout] && [:wait_timeout] != [:wait_queue_timeout] raise ArgumentError, "Wait timeout #{ [:wait_timeout]} is not identical to wait queue timeout #{ [:wait_queue_timeout]}" end [:min_size] ||= [:min_pool_size] .delete(:min_pool_size) [:max_size] ||= [:max_pool_size] .delete(:max_pool_size) if [:min_size] && [:max_size] && ( [:max_size] != 0 && [:min_size] > [:max_size]) then raise ArgumentError, "Cannot have min size #{ [:min_size]} exceed max size #{ [:max_size]}" end if [:wait_queue_timeout] [:wait_timeout] ||= [:wait_queue_timeout] end .delete(:wait_queue_timeout) @server = server @options = .freeze @generation_manager = GenerationManager.new(server: server) @ready = false @closed = false # A connection owned by this pool should be either in the # available connections array (which is used as a stack) # or in the checked out connections set. @available_connections = available_connections = [] @checked_out_connections = Set.new @pending_connections = Set.new @interrupt_connections = [] # Mutex used for synchronizing access to @available_connections and # @checked_out_connections. The pool object is thread-safe, thus # all methods that retrieve or modify instance variables generally # must do so under this lock. @lock = Mutex.new # Background thread reponsible for maintaining the size of # the pool to at least min_size @populator = Populator.new(self, ) @populate_semaphore = Semaphore.new # Condition variable to enforce the first check in check_out: max_pool_size. # This condition variable should be signaled when the number of # unavailable connections decreases (pending + pending_connections + # checked_out_connections). @size_cv = Mongo::ConditionVariable.new(@lock) # This represents the number of threads that have made it past the size_cv # gate but have not acquired a connection to add to the pending_connections # set. @connection_requests = 0 # Condition variable to enforce the second check in check_out: max_connecting. # Thei condition variable should be signaled when the number of pending # connections decreases. @max_connecting_cv = Mongo::ConditionVariable.new(@lock) @max_connecting = .fetch(:max_connecting, DEFAULT_MAX_CONNECTING) ObjectSpace.define_finalizer(self, self.class.finalize(@available_connections, @pending_connections, @populator)) publish_cmap_event( Monitoring::Event::Cmap::PoolCreated.new(@server.address, , self) ) end
Class Method Details
.finalize(available_connections, pending_connections, populator) ⇒ Proc
Finalize the connection pool for garbage collection.
# File 'lib/mongo/server/connection_pool.rb', line 813
def self.finalize(available_connections, pending_connections, populator) proc do available_connections.each do |connection| connection.disconnect!(reason: :pool_closed) end available_connections.clear pending_connections.each do |connection| connection.disconnect!(reason: :pool_closed) end pending_connections.clear # Finalizer does not close checked out connections. # Those would have to be garbage collected on their own # and that should close them. end end
Instance Attribute Details
#closed? ⇒ true
| false
(readonly)
Whether the pool has been closed.
# File 'lib/mongo/server/connection_pool.rb', line 304
def closed? !!@closed end
#generation_manager ⇒ Integer
(readonly)
# File 'lib/mongo/server/connection_pool.rb', line 233
attr_reader :generation_manager
#max_connecting (readonly)
# File 'lib/mongo/server/connection_pool.rb', line 342
attr_reader :max_connecting
#options ⇒ Hash
(readonly)
# File 'lib/mongo/server/connection_pool.rb', line 180
attr_reader :
#paused? ⇒ true
| false
(readonly)
A connection pool is paused if it is not closed and it is not ready.
# File 'lib/mongo/server/connection_pool.rb', line 246
def paused? raise_if_closed! @lock.synchronize do !@ready end end
#populate_semaphore (readonly)
Condition variable broadcast when the size of the pool changes to wake up the populator
# File 'lib/mongo/server/connection_pool.rb', line 59
attr_reader :populate_semaphore
#populator (readonly)
# File 'lib/mongo/server/connection_pool.rb', line 339
attr_reader :populator
#ready? ⇒ true
| false
(readonly)
Whether the pool is ready.
# File 'lib/mongo/server/connection_pool.rb', line 311
def ready? @lock.synchronize do @ready end end
#server (readonly)
# File 'lib/mongo/server/connection_pool.rb', line 183
attr_reader :server
Instance Method Details
#available_count ⇒ Integer
Number of available connections in the pool.
# File 'lib/mongo/server/connection_pool.rb', line 291
def available_count raise_if_closed! @lock.synchronize do @available_connections.length end end
#check_in(connection)
Check a connection back into the pool.
The connection must have been previously created by this pool.
# File 'lib/mongo/server/connection_pool.rb', line 402
def check_in(connection) check_invariants @lock.synchronize do do_check_in(connection) end ensure check_invariants end
#check_invariants (private)
# File 'lib/mongo/server/connection_pool.rb', line 1010
def check_invariants return unless Lint.enabled? # Server summary calls pool summary which requires pool lock -> deadlock. # Obtain the server summary ahead of time. server_summary = @server.summary @lock.synchronize do @available_connections.each do |connection| if connection.closed? raise Error::LintError, "Available connection is closed: #{connection} for #{server_summary}" end end @pending_connections.each do |connection| if connection.closed? raise Error::LintError, "Pending connection is closed: #{connection} for #{server_summary}" end end end end
#check_out(connection_global_id: nil, context: nil) ⇒ Mongo::Server::Connection
Checks a connection out of the pool.
If there are active connections in the pool, the most recently used connection is returned. Otherwise if the connection pool size is less than the max size, creates a new connection and returns it. Otherwise waits up to the wait timeout and raises Timeout::Error if there are still no active connections and the pool is at max size.
The returned connection counts toward the pool’s max size. When the caller is finished using the connection, the connection should be checked back in via the check_in method.
# File 'lib/mongo/server/connection_pool.rb', line 366
def check_out(connection_global_id: nil, context: nil) check_invariants publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckOutStarted.new(@server.address) ) raise_if_pool_closed! raise_if_pool_paused_locked! connection = retrieve_and_connect_connection( connection_global_id, context ) publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckedOut.new(@server.address, connection.id, self), ) if Lint.enabled? unless connection.connected? raise Error::LintError, "Connection pool for #{address} checked out a disconnected connection #{connection.generation}:#{connection.id}" end end connection ensure check_invariants end
#clear(options = nil) ⇒ true
Closes all idle connections in the pool and schedules currently checked out connections to be closed when they are checked back into the pool. The pool is paused, it will not create new connections in background and it will fail checkout requests until marked ready.
#clear_pending_connections (private)
Clear and disconnect the pending connections.
# File 'lib/mongo/server/connection_pool.rb', line 1074
def clear_pending_connections until @pending_connections.empty? connection = @pending_connections.take(1).first connection.disconnect! @pending_connections.delete(connection) end end
#close(options = nil) ⇒ true
Marks the pool closed, closes all idle connections in the pool and schedules currently checked out connections to be closed when they are checked back into the pool. If force option is true, checked out connections are also closed. Attempts to use the pool after it is closed will raise ::Mongo::Error::PoolClosedError
.
# File 'lib/mongo/server/connection_pool.rb', line 644
def close( = nil) return if closed? ||= {} stop_populator @lock.synchronize do until @available_connections.empty? connection = @available_connections.pop connection.disconnect!(reason: :pool_closed) end if [:force] until @checked_out_connections.empty? connection = @checked_out_connections.take(1).first connection.disconnect!(reason: :pool_closed) @checked_out_connections.delete(connection) end end unless && [:stay_ready] # mark pool as closed before releasing lock so # no connections can be created, checked in, or checked out @closed = true @ready = false end @max_connecting_cv.broadcast @size_cv.broadcast end publish_cmap_event( Monitoring::Event::Cmap::PoolClosed.new(@server.address, self) ) true end
#close_available_connections(service_id) (private)
Close the available connections.
# File 'lib/mongo/server/connection_pool.rb', line 1036
def close_available_connections(service_id) if @server.load_balancer? && service_id loop do conn = @available_connections.detect do |conn| conn.service_id == service_id && conn.generation < @generation_manager.generation(service_id: service_id) end if conn @available_connections.delete(conn) conn.disconnect!(reason: :stale, interrupted: true) @populate_semaphore.signal else break end end else @available_connections.delete_if do |conn| if conn.generation < @generation_manager.generation(service_id: service_id) conn.disconnect!(reason: :stale, interrupted: true) @populate_semaphore.signal true end end end end
#close_idle_sockets
Close sockets that have been open for longer than the max idle time,
if the option is set.
# File 'lib/mongo/server/connection_pool.rb', line 734
def close_idle_sockets return if closed? return unless max_idle_time @lock.synchronize do i = 0 while i < @available_connections.length connection = @available_connections[i] if last_checkin = connection.last_checkin if (Time.now - last_checkin) > max_idle_time connection.disconnect!(reason: :idle) @available_connections.delete_at(i) @populate_semaphore.signal next end end i += 1 end end end
#connect_connection(connection, context = nil) (private)
Attempts to connect (handshake and auth) the connection. If an error is encountered, closes the connection and raises the error.
# File 'lib/mongo/server/connection_pool.rb', line 994
def connect_connection(connection, context = nil) begin connection.connect!(context) rescue Exception connection.disconnect!(reason: :error) raise end rescue Error::SocketError, Error::SocketTimeoutError => exc @server.unknown!( generation: exc.generation, service_id: exc.service_id, stop_push_monitor: true, ) raise end
#connect_or_raise(connection, context) (private)
Connects a connection and raises an exception if the connection cannot be connected. This method also publish corresponding event and ensures that counters and condition variables are updated.
# File 'lib/mongo/server/connection_pool.rb', line 1348
def connect_or_raise(connection, context) connect_connection(connection, context) rescue Exception # Handshake or authentication failed @lock.synchronize do if @pending_connections.include?(connection) @pending_connections.delete(connection) end @max_connecting_cv.signal @size_cv.signal end @populate_semaphore.signal publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckOutFailed.new( @server.address, Monitoring::Event::Cmap::ConnectionCheckOutFailed::CONNECTION_ERROR ), ) raise end
#connection_stale_unlocked?(connection) ⇒ true
| false
(private)
Checks whether a connection is stale.
# File 'lib/mongo/server/connection_pool.rb', line 957
def connection_stale_unlocked?(connection) connection.generation != generation_unlocked(service_id: connection.service_id) && !connection.pinned? end
#create_and_add_connection ⇒ true
| false
(private)
Create a connection, connect it, and add it to the pool. Also check for stale and interruptable connections and deal with them.
# File 'lib/mongo/server/connection_pool.rb', line 872
def create_and_add_connection connection = nil @lock.synchronize do if !closed? && @ready && (unsynchronized_size + @connection_requests) < min_size && @pending_connections.length < @max_connecting then connection = create_connection @pending_connections << connection else return true if remove_interrupted_connections return true if remove_stale_connection return false end end begin connect_connection(connection) rescue Exception @lock.synchronize do @pending_connections.delete(connection) @max_connecting_cv.signal @size_cv.signal end raise end @lock.synchronize do @available_connections << connection @pending_connections.delete(connection) @max_connecting_cv.signal @size_cv.signal end true end
#create_connection (private)
# File 'lib/mongo/server/connection_pool.rb', line 852
def create_connection r, _ = @generation_manager.pipe_fds(service_id: server.description.service_id) opts = .merge( connection_pool: self, pipe: r # Do not pass app metadata - this will be retrieved by the connection # based on the auth needs. ) unless @server.load_balancer? opts[:generation] = generation end Connection.new(@server, opts) end
#decrement_connection_requests_and_signal (private)
Decrement connection requests counter and signal the condition variables that the number of unavailable connections has decreased.
# File 'lib/mongo/server/connection_pool.rb', line 1372
def decrement_connection_requests_and_signal @connection_requests -= 1 @max_connecting_cv.signal @size_cv.signal end
#disconnect!(options = nil)
# File 'lib/mongo/server/connection_pool.rb', line 537
def disconnect!( = nil) do_clear( ) rescue Error::PoolClosedError # The "disconnected" state is between closed and paused. # When we are trying to disconnect the pool, permit the pool to be # already closed. end
#do_check_in(connection)
Executes the check in after having already acquired the lock.
# File 'lib/mongo/server/connection_pool.rb', line 415
def do_check_in(connection) # When a connection is interrupted it is checked back into the pool # and closed. The operation that was using the connection before it was # interrupted will attempt to check it back into the pool, and we # should ignore it since its already been closed and removed from the pool. return if connection.closed? && connection.interrupted? unless connection.connection_pool == self raise ArgumentError, "Trying to check in a connection which was not checked out by this pool: #{connection} checked out from pool #{connection.connection_pool} (for #{self})" end unless @checked_out_connections.include?(connection) raise ArgumentError, "Trying to check in a connection which is not currently checked out by this pool: #{connection} (for #{self})" end # Note: if an event handler raises, resource will not be signaled. # This means threads waiting for a connection to free up when # the pool is at max size may time out. # Threads that begin waiting after this method completes (with # the exception) should be fine. @checked_out_connections.delete(connection) @size_cv.signal publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckedIn.new(@server.address, connection.id, self) ) if connection.interrupted? connection.disconnect!(reason: :stale) return end if connection.error? connection.disconnect!(reason: :error) return end if closed? connection.disconnect!(reason: :pool_closed) return end if connection.closed? # Connection was closed - for example, because it experienced # a network error. Nothing else needs to be done here. @populate_semaphore.signal elsif connection.generation != generation(service_id: connection.service_id) && !connection.pinned? # If connection is marked as pinned, it is used by a transaction # or a series of cursor operations in a load balanced setup. # In this case connection should not be disconnected until # unpinned. connection.disconnect!(reason: :stale) @populate_semaphore.signal else connection.record_checkin! @available_connections << connection @max_connecting_cv.signal end end
#do_clear(options = nil)
# File 'lib/mongo/server/connection_pool.rb', line 545
def do_clear( = nil) check_invariants service_id = && [:service_id] @lock.synchronize do # Generation must be bumped before emitting pool cleared event. @generation_manager.bump(service_id: service_id) unless && [:lazy] close_available_connections(service_id) end if && [:interrupt_in_use_connections] schedule_for_interruption(@checked_out_connections, service_id) schedule_for_interruption(@pending_connections, service_id) end if @ready publish_cmap_event( Monitoring::Event::Cmap::PoolCleared.new( @server.address, service_id: service_id, interrupt_in_use_connections: &.[](:interrupt_in_use_connections) ) ) # Only pause the connection pool if the server was marked unknown, # otherwise, allow the retry to be attempted with a ready pool. do_pause if !@server.load_balancer? && @server.unknown? end # Broadcast here to cause all of the threads waiting on the max # connecting to break out of the wait loop and error. @max_connecting_cv.broadcast # Broadcast here to cause all of the threads waiting on the pool size # to break out of the wait loop and error. @size_cv.broadcast end # "Schedule the background thread" after clearing. This is responsible # for cleaning up stale threads, and interrupting in use connections. @populate_semaphore.signal true ensure check_invariants end
#do_pause
Mark the connection pool as paused without acquiring the lock.
#get_connection(pid, connection_global_id) ⇒ Mongo::Server::Connection (private)
Retrieves a connection if one is available, otherwise we create a new one. If no connection exists and the pool is at max size, wait until a connection is checked back into the pool.
# File 'lib/mongo/server/connection_pool.rb', line 1209
def get_connection(pid, connection_global_id) if connection = next_available_connection(connection_global_id) unless valid_available_connection?(connection, pid, connection_global_id) return nil end # We've got a connection, so we decrement the number of connection # requests. # We do not need to signal condition variable here, because # because the execution will continue, and we signal later. @connection_requests -= 1 # If the connection is connected, it's not considered a # "pending connection". The pending_connections list represents # the set of connections that are awaiting connection. unless connection.connected? @pending_connections << connection end return connection elsif connection_global_id && @server.load_balancer? # A particular connection is requested, but it is not available. # If it is nether available not checked out, we should stop here. @checked_out_connections.detect do |conn| conn.global_id == connection_global_id end.tap do |conn| if conn.nil? publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckOutFailed.new( @server.address, Monitoring::Event::Cmap::ConnectionCheckOutFailed::CONNECTION_ERROR ), ) # We're going to raise, so we need to decrement the number of # connection requests. decrement_connection_requests_and_signal raise Error::MissingConnection.new end end # We need a particular connection, and if it is not available # we can wait for an in-progress operation to return # such a connection to the pool. nil else connection = create_connection @connection_requests -= 1 @pending_connections << connection return connection end end
#inspect ⇒ String
Get a pretty printed string inspection for the pool.
# File 'lib/mongo/server/connection_pool.rb', line 691
def inspect if closed? "#<Mongo::Server::ConnectionPool:0x#{object_id} min_size=#{min_size} max_size=#{max_size} " + "wait_timeout=#{wait_timeout} closed>" elsif !ready? "#<Mongo::Server::ConnectionPool:0x#{object_id} min_size=#{min_size} max_size=#{max_size} " + "wait_timeout=#{wait_timeout} paused>" else "#<Mongo::Server::ConnectionPool:0x#{object_id} min_size=#{min_size} max_size=#{max_size} " + "wait_timeout=#{wait_timeout} current_size=#{size} available=#{available_count}>" end end
#max_idle_time ⇒ Float
| nil
The maximum seconds a socket can remain idle since it has been checked in to the pool, if set.
# File 'lib/mongo/server/connection_pool.rb', line 228
def max_idle_time @max_idle_time ||= [:max_idle_time] end
#max_size ⇒ Integer
Get the maximum size of the connection pool.
# File 'lib/mongo/server/connection_pool.rb', line 193
def max_size @max_size ||= [:max_size] || [DEFAULT_MAX_SIZE, min_size].max end
#maybe_raise_pool_cleared!(connection, e) (private)
If the connection was interrupted, raise a pool cleared error. If it wasn’t interrupted raise the original error.
# File 'lib/mongo/server/connection_pool.rb', line 981
def maybe_raise_pool_cleared!(connection, e) if connection&.interrupted? err = Error::PoolClearedError.new(connection.server.address, connection.server.pool_internal).tap do |err| e.labels.each { |l| err.add_label(l) } end raise err else raise e end end
#min_size ⇒ Integer
Get the minimum size of the connection pool.
# File 'lib/mongo/server/connection_pool.rb', line 202
def min_size @min_size ||= [:min_size] || DEFAULT_MIN_SIZE end
#next_available_connection(connection_global_id) (private)
Returns the next available connection, optionally with given global id. If no suitable connections are available, returns nil.
# File 'lib/mongo/server/connection_pool.rb', line 836
def next_available_connection(connection_global_id) raise_unless_locked! if @server.load_balancer? && connection_global_id conn = @available_connections.detect do |conn| conn.global_id == connection_global_id end if conn @available_connections.delete(conn) end conn else @available_connections.pop end end
#pause
Mark the connection pool as paused.
# File 'lib/mongo/server/connection_pool.rb', line 478
def pause raise_if_closed! check_invariants @lock.synchronize do do_pause end ensure check_invariants end
#populate ⇒ true
| false
This method does three things:
-
Creates and adds a connection to the pool, if the pool’s size is below min_size. Retries once if a socket-related error is encountered during this process and raises if a second error or a non socket-related error occurs.
-
Removes stale connections from the connection pool.
-
Interrupts connections marked for interruption.
Used by the pool populator background thread.
occured, or the non socket-related error
# File 'lib/mongo/server/connection_pool.rb', line 792
def populate return false if closed? begin return create_and_add_connection rescue Error::SocketError, Error::SocketTimeoutError => e # an error was encountered while connecting the connection, # ignore this first error and try again. log_warn("Populator failed to connect a connection for #{address}: #{e.class}: #{e}. It will retry.") end return create_and_add_connection end
#raise_check_out_timeout!(connection_global_id) (private)
The lock should be acquired when calling this method.
# File 'lib/mongo/server/connection_pool.rb', line 1083
def raise_check_out_timeout!(connection_global_id) raise_unless_locked! publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckOutFailed.new( @server.address, Monitoring::Event::Cmap::ConnectionCheckOutFailed::TIMEOUT, ), ) connection_global_id_msg = if connection_global_id " for connection #{connection_global_id}" else '' end msg = "Timed out attempting to check out a connection " + "from pool for #{@server.address}#{connection_global_id_msg} after #{wait_timeout} sec. " + "Connections in pool: #{@available_connections.length} available, " + "#{@checked_out_connections.length} checked out, " + "#{@pending_connections.length} pending, " + "#{@connection_requests} connections requests " + "(max size: #{max_size})" raise Error::ConnectionCheckOutTimeout.new(msg, address: @server.address) end
#raise_check_out_timeout_locked!(connection_global_id) (private)
# File 'lib/mongo/server/connection_pool.rb', line 1109
def raise_check_out_timeout_locked!(connection_global_id) @lock.synchronize do raise_check_out_timeout!(connection_global_id) end end
#raise_if_closed! (private)
Asserts that the pool has not been closed.
# File 'lib/mongo/server/connection_pool.rb', line 967
def raise_if_closed! if closed? raise Error::PoolClosedError.new(@server.address, self) end end
#raise_if_not_ready! (private)
The lock should be acquired when calling this method.
# File 'lib/mongo/server/connection_pool.rb', line 1150
def raise_if_not_ready! raise_unless_locked! raise_if_pool_closed! raise_if_pool_paused! end
#raise_if_pool_closed! (private)
# File 'lib/mongo/server/connection_pool.rb', line 1115
def raise_if_pool_closed! if closed? publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckOutFailed.new( @server.address, Monitoring::Event::Cmap::ConnectionCheckOutFailed::POOL_CLOSED ), ) raise Error::PoolClosedError.new(@server.address, self) end end
#raise_if_pool_paused! (private)
# File 'lib/mongo/server/connection_pool.rb', line 1127
def raise_if_pool_paused! raise_unless_locked! if !@ready publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckOutFailed.new( @server.address, # CMAP spec decided to conflate pool paused with all the other # possible non-timeout errors. Monitoring::Event::Cmap::ConnectionCheckOutFailed::CONNECTION_ERROR, ), ) raise Error::PoolPausedError.new(@server.address, self) end end
#raise_if_pool_paused_locked! (private)
# File 'lib/mongo/server/connection_pool.rb', line 1143
def raise_if_pool_paused_locked! @lock.synchronize do raise_if_pool_paused! end end
#raise_unless_locked! (private)
# File 'lib/mongo/server/connection_pool.rb', line 1156
def raise_unless_locked! unless @lock.owned? raise ArgumentError, "the lock must be owned when calling this method" end end
#ready (readonly)
Instructs the pool to create and return connections.
# File 'lib/mongo/server/connection_pool.rb', line 593
def ready raise_if_closed! # TODO: Add this back in RUBY-3174. # if Lint.enabled? # unless @server.connected? # raise Error::LintError, "Attempting to ready a pool for server #{@server.summary} which is disconnected" # end # end @lock.synchronize do return if @ready @ready = true end # Note that the CMAP spec demands serialization of CMAP events for a # pool. In order to implement this, event publication must be done into # a queue which is synchronized, instead of subscribers being invoked # from the trigger method like this one here inline. On MRI, assuming # the threads yield to others when they stop having work to do, it is # likely that the events would in practice always be published in the # required order. JRuby, being truly concurrent with OS threads, # would not offers such a guarantee. publish_cmap_event( Monitoring::Event::Cmap::PoolReady.new(@server.address, , self) ) if .fetch(:populator_io, true) if @populator.running? @populate_semaphore.signal else @populator.run! end end end
#remove_interrupted_connections (private)
Interrupt connections scheduled for interruption.
# File 'lib/mongo/server/connection_pool.rb', line 920
def remove_interrupted_connections return false if @interrupt_connections.empty? gens = Set.new while conn = @interrupt_connections.pop if @checked_out_connections.include?(conn) # If the connection has been checked out, mark it as interrupted and it will # be disconnected on check in. conn.interrupted! do_check_in(conn) elsif @pending_connections.include?(conn) # If the connection is pending, disconnect with the interrupted flag. conn.disconnect!(reason: :stale, interrupted: true) @pending_connections.delete(conn) end gens << [ conn.generation, conn.service_id ] end # Close the write side of the pipe. Pending connections might be # hanging on the Kernel#select call, so in order to interrupt that, # we also listen for the read side of the pipe in Kernel#select and # close the write side of the pipe here, which will cause select to # wake up and raise an IOError now that the socket is closed. # The read side of the pipe will be scheduled for closing on the next # generation bump. gens.each do |gen, service_id| @generation_manager.remove_pipe_fds(gen, service_id: service_id) end true end
#remove_stale_connection (private)
Removes and disconnects all stale available connections.
# File 'lib/mongo/server/connection_pool.rb', line 911
def remove_stale_connection if conn = @available_connections.detect(&method(:connection_stale_unlocked?)) conn.disconnect!(reason: :stale) @available_connections.delete(conn) return true end end
#retrieve_and_connect_connection(connection_global_id, context = nil) ⇒ Mongo::Server::Connection (private)
Retrieves a connection and connects it.
# File 'lib/mongo/server/connection_pool.rb', line 1271
def retrieve_and_connect_connection(connection_global_id, context = nil) deadline = Utils.monotonic_time + wait_timeout(context) connection = nil @lock.synchronize do # The first gate to checking out a connection. Make sure the number of # unavailable connections is less than the max pool size. until max_size == 0 || unavailable_connections < max_size wait = deadline - Utils.monotonic_time raise_check_out_timeout!(connection_global_id) if wait <= 0 @size_cv.wait(wait) raise_if_not_ready! end @connection_requests += 1 connection = wait_for_connection(connection_global_id, deadline) end connect_or_raise(connection, context) unless connection.connected? @lock.synchronize do @checked_out_connections << connection if @pending_connections.include?(connection) @pending_connections.delete(connection) end @max_connecting_cv.signal # no need to signal size_cv here since the number of unavailable # connections is unchanged. end connection end
#schedule_for_interruption(connections, service_id) (private)
Schedule connections of previous generations for interruption.
# File 'lib/mongo/server/connection_pool.rb', line 1066
def schedule_for_interruption(connections, service_id) @interrupt_connections += connections.select do |conn| (!server.load_balancer? || conn.service_id == service_id) && conn.generation < @generation_manager.generation(service_id: service_id) end end
#size ⇒ Integer
Size of the connection pool.
Includes available and checked out connections.
# File 'lib/mongo/server/connection_pool.rb', line 261
def size raise_if_closed! @lock.synchronize do unsynchronized_size end end
#stop_populator
Stop the background populator thread and clean up any connections created which have not been connected yet.
Used when closing the pool or when terminating the bg thread for testing purposes. In the latter case, this method must be called before the pool is used, to ensure no connections in pending_connections were created in-flow by the check_out method.
# File 'lib/mongo/server/connection_pool.rb', line 764
def stop_populator @populator.stop! @lock.synchronize do # If stop_populator is called while populate is running, there may be # connections waiting to be connected, connections which have not yet # been moved to available_connections, or connections moved to available_connections # but not deleted from pending_connections. These should be cleaned up. clear_pending_connections end end
#summary
This method is experimental and subject to change.
# File 'lib/mongo/server/connection_pool.rb', line 321
def summary @lock.synchronize do state = if closed? 'closed' elsif !@ready 'paused' else 'ready' end "#<ConnectionPool size=#{unsynchronized_size} (#{min_size}-#{max_size}) " + "used=#{@checked_out_connections.length} avail=#{@available_connections.length} pending=#{@pending_connections.length} #{state}>" end end
#unsynchronized_size (private)
Returns the size of the connection pool without acquiring the lock. This method should only be used by other pool methods when they are already holding the lock as Ruby does not allow a thread holding a lock to acquire this lock again.
# File 'lib/mongo/server/connection_pool.rb', line 273
def unsynchronized_size @available_connections.length + @checked_out_connections.length + @pending_connections.length end
#valid_available_connection?(connection, pid, connection_global_id) ⇒ Boolean
(private)
# File 'lib/mongo/server/connection_pool.rb', line 1162
def valid_available_connection?(connection, pid, connection_global_id) if connection.pid != pid log_warn("Detected PID change - Mongo client should have been reconnected (old pid #{connection.pid}, new pid #{pid}") connection.disconnect!(reason: :stale) @populate_semaphore.signal return false end if !connection.pinned? # If connection is marked as pinned, it is used by a transaction # or a series of cursor operations in a load balanced setup. # In this case connection should not be disconnected until # unpinned. if connection.generation != generation( service_id: connection.service_id ) # Stale connections should be disconnected in the clear # method, but if any don't, check again here connection.disconnect!(reason: :stale) @populate_semaphore.signal return false end if max_idle_time && connection.last_checkin && Time.now - connection.last_checkin > max_idle_time then connection.disconnect!(reason: :idle) @populate_semaphore.signal return false end end true end
#wait_for_connection(connection_global_id, deadline) ⇒ Mongo::Server::Connection (private)
Waits for a connection to become available, or raises is no connection becomes available before the timeout.
# File 'lib/mongo/server/connection_pool.rb', line 1310
def wait_for_connection(connection_global_id, deadline) connection = nil while connection.nil? # The second gate to checking out a connection. Make sure 1) there # exists an available connection and 2) we are under max_connecting. until @available_connections.any? || @pending_connections.length < @max_connecting wait = deadline - Utils.monotonic_time if wait <= 0 # We are going to raise a timeout error, so the connection # request is not going to be fulfilled. Decrement the counter # here. decrement_connection_requests_and_signal raise_check_out_timeout!(connection_global_id) end @max_connecting_cv.wait(wait) # We do not need to decrement the connection_requests counter # or signal here because the pool is not ready yet. raise_if_not_ready! end connection = get_connection(Process.pid, connection_global_id) wait = deadline - Utils.monotonic_time if connection.nil? && wait <= 0 # connection is nil here, it means that get_connection method # did not create a new connection; therefore, it did not decrease # the connection_requests counter. We need to do it here. decrement_connection_requests_and_signal raise_check_out_timeout!(connection_global_id) end end connection end
#wait_timeout(context = nil) ⇒ Float
The time to wait, in seconds, for a connection to become available.
# File 'lib/mongo/server/connection_pool.rb', line 214
def wait_timeout(context = nil) if context&.remaining_timeout_sec.nil? [:wait_timeout] || DEFAULT_WAIT_TIMEOUT else context&.remaining_timeout_sec end end
#with_connection(connection_global_id: nil, context: nil) ⇒ Object
Yield the block to a connection, while handling check in/check out logic.
# File 'lib/mongo/server/connection_pool.rb', line 714
def with_connection(connection_global_id: nil, context: nil) raise_if_closed! connection = check_out( connection_global_id: connection_global_id, context: context ) yield(connection) rescue Error::SocketError, Error::SocketTimeoutError, Error::ConnectionPerished => e maybe_raise_pool_cleared!(connection, e) ensure if connection check_in(connection) end end