Class: Mongo::Client
Relationships & Source Files | |
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
self,
Forwardable
|
|
Instance Chain:
self,
Loggable
|
|
Inherits: | Object |
Defined in: | lib/mongo/client.rb |
Overview
The client is the entry point to the driver and is the main object that will be interacted with.
Constant Summary
-
CRUD_OPTIONS =
The options that do not affect the behavior of a cluster and its subcomponents.
[ :, :database, :read, :read_concern, :write, :write_concern, :retry_reads, :max_read_retries, :read_retry_interval, :retry_writes, :max_write_retries, # Options which cannot currently be here: # # :server_selection_timeout # Server selection timeout is used by cluster constructor to figure out # how long to wait for initial scan in compatibility mode, but once # the cluster is initialized it no longer uses this timeout. # Unfortunately server selector reads server selection timeout out of # the cluster, and this behavior is required by Cluster#next_primary # which takes no arguments. When next_primary is removed we can revsit # using the same cluster object with different server selection timeouts. ].freeze
-
VALID_COMPRESSORS =
The compression algorithms supported by the driver.
[ Mongo::Protocol::Compressed::ZSTD, Mongo::Protocol::Compressed::SNAPPY, Mongo::Protocol::Compressed::ZLIB ].freeze
-
VALID_OPTIONS =
Valid client options.
[ :app_name, :auth_mech, :auth_mech_properties, :auth_source, :, :bg_error_backtrace, :cleanup, :compressors, :direct_connection, :connect, :connect_timeout, :database, :heartbeat_frequency, :id_generator, :load_balanced, :local_threshold, :logger, :log_prefix, :max_connecting, :max_idle_time, :max_pool_size, :max_read_retries, :max_write_retries, :min_pool_size, :monitoring, :monitoring_io, :password, :platform, :populator_io, :read, :read_concern, :read_retry_interval, :replica_set, :, :retry_reads, :retry_writes, :scan, :sdam_proc, :server_api, :server_selection_timeout, :socket_timeout, :srv_max_hosts, :srv_service_name, :ssl, :ssl_ca_cert, :ssl_ca_cert_object, :ssl_ca_cert_string, :ssl_cert, :ssl_cert_object, :ssl_cert_string, :ssl_key, :ssl_key_object, :ssl_key_pass_phrase, :ssl_key_string, :ssl_verify, :ssl_verify_certificate, :ssl_verify_hostname, :ssl_verify_ocsp_endpoint, :timeout_ms, :truncate_logs, :user, :wait_queue_timeout, :wrapping_libraries, :write, :write_concern, :zlib_compression_level, ].freeze
-
VALID_SERVER_API_VERSIONS =
The known server API versions.
%w( 1 ).freeze
Loggable
- Included
Class Method Summary
-
.canonicalize_ruby_options(options)
Internal use only
Internal use only
Lowercases auth mechanism properties, if given, in the specified options, then converts the options to an instance of
Options::Redacted
. -
.new(addresses_or_uri, options = nil) ⇒ Client
constructor
Instantiate a new driver client.
Instance Attribute Summary
- #closed? ⇒ Boolean readonly
- #cluster ⇒ Mongo::Cluster readonly
- #database ⇒ Mongo::Database readonly
- #encrypter ⇒ Mongo::Crypt::AutoEncrypter readonly
- #options ⇒ Hash readonly
Instance Method Summary
-
#==(other) ⇒ true, false
(also: #eql?)
Determine if this client is equivalent to another object.
-
#[](collection_name, options = {}) ⇒ Mongo::Collection
Get a collection object for the provided collection name.
-
#close ⇒ true
Close all connections.
-
#close_encrypter ⇒ true
Close encrypter and clean up auto-encryption resources.
- #cluster_options Internal use only Internal use only
-
#database_names(filter = {}, opts = {}) ⇒ Array<String>
Get the names of all databases.
-
#encrypted_fields_map ⇒ Hash | nil
Internal use only
Internal use only
Returns encrypted field map hash if provided when creating the client.
-
#eql?(other)
Alias for #==.
-
#get_session(options = {}) ⇒ Session | nil
Internal use only
Internal use only
Returns a session to use for operations if possible.
-
#hash ⇒ Integer
Get the hash value of the client.
-
#inspect ⇒ String
Get an inspection of the client as a string.
-
#list_databases(filter = {}, name_only = false, opts = {}) ⇒ Array<Hash>
Get info for each database.
-
#list_mongo_databases(filter = {}, opts = {}) ⇒ Array<Mongo::Database>
Returns a list of
Database
objects. -
#max_read_retries ⇒ Integer
Internal use only
Internal use only
Get the maximum number of times the client can retry a read operation when using legacy read retries.
-
#max_write_retries ⇒ Integer
Internal use only
Internal use only
Get the maximum number of times the client can retry a write operation when using legacy write retries.
-
#read_concern ⇒ Hash
Get the read concern for this client.
-
#read_preference ⇒ BSON::Document
Get the read preference from the options passed to the client.
-
#read_retry_interval ⇒ Float
Internal use only
Internal use only
Get the interval, in seconds, in which read retries when using legacy read retries.
-
#reconnect ⇒ true
Reconnect the client.
-
#server_selector ⇒ Mongo::ServerSelector
Get the server selector.
-
#start_session(options = {}) ⇒ Session
Start a session.
-
#summary ⇒ String
Get a summary of the client state.
- #timeout_ms ⇒ Integer | nil Internal use only Internal use only
- #timeout_sec ⇒ Float | nil Internal use only Internal use only
-
#update_options(new_options) ⇒ Hash
Internal use only
Internal use only
Updates this client’s options from new_options, validating all options.
-
#use(name) ⇒ Mongo::Client
Creates a new client configured to use the database with the provided name, and using the other options configured in this client.
-
#watch(pipeline = [], options = {}) ⇒ ChangeStream
As of version 3.6 of the MongoDB server, a “$changeStream“ pipeline stage is supported in the aggregation framework.
-
#with(new_options = nil) ⇒ Mongo::Client
Creates a new client with the passed options merged over the existing options of this client.
-
#with_session(options = {}, &block)
Internal use only
Internal use only
Creates a session to use for operations if possible and yields it to the provided block.
-
#write_concern ⇒ Mongo::WriteConcern
Get the write concern for this client.
- #assert_not_closed private
-
#build_encrypter
private
Create a new encrypter object using the client’s auto encryption options.
- #cluster_modifying?(new_options) ⇒ Boolean private
-
#default_options(options)
private
Generate default client options based on the
URI
and options passed into theClient
constructor. -
#do_close
private
Implementation for #close, assumes the connect lock is already acquired.
-
#get_session!(options = {}) ⇒ Session
private
Internal use only
Internal use only
Returns a session to use for operations.
-
#initialize_copy(original)
private
Internal use only
Internal use only
Auxiliary method that is called by interpreter when copying the client via dup or clone.
- #monitoring ⇒ Monitoring private Internal use only Internal use only
-
#process_addresses(addresses, options) ⇒ Hash<:uri, :addresses, :options>
private
Attempts to parse the given list of addresses, using the provided options.
-
#process_addresses_array(addresses, options) ⇒ Hash<:uri, :addresses, :options>
private
Attempts to parse the given list of addresses, using the provided options.
-
#process_addresses_string(addresses, options) ⇒ Hash<:uri, :addresses, :options>
private
Attempts to parse the given list of addresses, using the provided options.
- #valid_compressors(compressors) private
-
#validate_authentication_options!
private
Validates all authentication-related options after they are set on the client This method is intended to catch combinations of options which are not allowed.
-
#validate_max_connecting!(option, opts) ⇒ true
private
Validates whether the max_connecting option is valid.
- #validate_max_min_pool_size!(option, opts) private
-
#validate_new_options!(opts)
private
Validates options in the provided argument for validity.
-
#validate_options!(addresses = nil, is_srv: nil)
private
Validates all options after they are set on the client.
- #validate_read!(option, opts) private
- #validate_snappy_compression! private
- #validate_zstd_compression! private
Loggable
- Included
#log_debug | Convenience method to log debug messages with the standard prefix. |
#log_error | Convenience method to log error messages with the standard prefix. |
#log_fatal | Convenience method to log fatal messages with the standard prefix. |
#log_info | Convenience method to log info messages with the standard prefix. |
#log_warn | Convenience method to log warn messages with the standard prefix. |
#logger | Get the logger instance. |
#_mongo_log_prefix, #format_message |
Constructor Details
.new(addresses_or_uri, options = nil) ⇒ Client
Instantiate a new driver client.
# File 'lib/mongo/client.rb', line 502
def initialize(addresses_or_uri, = nil) = ? .dup : {} processed = process_addresses(addresses_or_uri, ) uri = processed[:uri] addresses = processed[:addresses] = processed[: ] # If the URI is an SRV URI, note this so that we can start # SRV polling if the topology is a sharded cluster. srv_uri = uri if uri.is_a?(URI::SRVProtocol) = self.class. ( ) # The server API version is specified to be a string. # However, it is very annoying to always provide the number 1 as a string, # therefore cast to the string type here. if server_api = [:server_api] if server_api.is_a?(Hash) server_api = Options::Redacted.new(server_api) if (version = server_api[:version]).is_a?(Integer) [:server_api] = server_api.merge(version: version.to_s) end end end # Special handling for sdam_proc as it is only used during client # construction sdam_proc = .delete(:sdam_proc) # For gssapi service_name, the default option is given in a hash # (one level down from the top level). = ( ) .each do |k, v| default_v = [k] if Hash === default_v v = default_v.merge(v) end [k] = v end = .keys.each do |k| if [k].nil? .delete(k) end end @options = ( ) =begin WriteConcern object support if @options[:write_concern].is_a?(WriteConcern::Base) # Cache the instance so that we do not needlessly reconstruct it. @write_concern = @options[:write_concern] @options[:write_concern] = @write_concern.options end =end @options.freeze (addresses, is_srv: uri.is_a?(URI::SRVProtocol)) = @options.dup .delete(:server_api) @database = Database.new(self, @options[:database], ) # Temporarily set monitoring so that event subscriptions can be # set up without there being a cluster @monitoring = Monitoring.new(@options) if sdam_proc sdam_proc.call(self) end @connect_lock = Mutex.new @connect_lock.synchronize do @cluster = Cluster.new(addresses, @monitoring, .merge(srv_uri: srv_uri)) end begin # Unset monitoring, it will be taken out of cluster from now on remove_instance_variable('@monitoring') if @options[: ] @connect_lock.synchronize do build_encrypter end end rescue begin @cluster.close rescue => e log_warn("Eror closing cluster in client constructor's exception handler: #{e.class}: #{e}") # Drop this exception so that the original exception is raised end raise end if block_given? begin yield(self) ensure close end end end
Class Method Details
.canonicalize_ruby_options(options)
Lowercases auth mechanism properties, if given, in the specified options, then converts the options to an instance of Options::Redacted
.
Instance Attribute Details
#closed? ⇒ Boolean
(readonly)
# File 'lib/mongo/client.rb', line 856
def closed? !!@closed end
#cluster ⇒ Mongo::Cluster (readonly)
# File 'lib/mongo/client.rb', line 139
attr_reader :cluster
#database ⇒ Mongo::Database (readonly)
# File 'lib/mongo/client.rb', line 142
attr_reader :database
#encrypter ⇒ Mongo::Crypt::AutoEncrypter (readonly)
# File 'lib/mongo/client.rb', line 149
attr_reader :encrypter
#options ⇒ Hash
(readonly)
# File 'lib/mongo/client.rb', line 145
attr_reader :
Instance Method Details
#==(other) ⇒ true
, false
Also known as: #eql?
Determine if this client is equivalent to another object.
#[](collection_name, options = {}) ⇒ Mongo::Collection
Get a collection object for the provided collection name.
# File 'lib/mongo/client.rb', line 195
def [](collection_name, = {}) database[collection_name, ] end
#assert_not_closed (private)
# File 'lib/mongo/client.rb', line 1707
def assert_not_closed if closed? raise Error::ClientClosed, "The client was closed and is not usable for operations. Call #reconnect to reset this client instance or create a new client instance" end end
#build_encrypter (private)
Create a new encrypter object using the client’s auto encryption options
# File 'lib/mongo/client.rb', line 1268
def build_encrypter @encrypter = Crypt::AutoEncrypter.new( @options[: ].merge(client: self) ) end
#close ⇒ true
Close all connections.
# File 'lib/mongo/client.rb', line 865
def close @connect_lock.synchronize do @closed = true do_close end true end
#close_encrypter ⇒ true
Close encrypter and clean up auto-encryption resources.
# File 'lib/mongo/client.rb', line 876
def close_encrypter @encrypter.close if @encrypter true end
#cluster_modifying?(new_options) ⇒ Boolean
(private)
# File 'lib/mongo/client.rb', line 1352
def ( ) = .reject do |name| CRUD_OPTIONS.include?(name.to_sym) end .any? do |name, value| [name] != value end end
#cluster_options
# File 'lib/mongo/client.rb', line 611
def # We share clusters when a new client with different CRUD_OPTIONS # is requested; therefore, cluster should not be getting any of these # options upon instantiation .reject do |key, value| CRUD_OPTIONS.include?(key.to_sym) end.merge( # but need to put the database back in for auth... database: [:database], # Put these options in for legacy compatibility, but note that # their values on the client and the cluster do not have to match - # applications should read these values from client, not from cluster max_read_retries: [:max_read_retries], read_retry_interval: [:read_retry_interval], ).tap do || # If the client has a cluster already, forward srv_uri to the new # cluster to maintain SRV monitoring. If the client is brand new, # its constructor sets srv_uri manually. if cluster .update(srv_uri: cluster. [:srv_uri]) end end end
#database_names(filter = {}, opts = {}) ⇒ Array
<String
>
Get the names of all databases.
# File 'lib/mongo/client.rb', line 932
def database_names(filter = {}, opts = {}) list_databases(filter, true, opts).collect{ |info| info['name'] } end
#default_options(options) (private)
Generate default client options based on the URI
and options passed into the Client
constructor.
# File 'lib/mongo/client.rb', line 1276
def ( ) Database::DEFAULT_OPTIONS.dup.tap do || if [:auth_mech] || [:user] [:auth_source] = Auth::User.default_auth_source( ) end if [:auth_mech] == :gssapi [:auth_mech_properties] = { service_name: 'mongodb' } end [:retry_reads] = true [:retry_writes] = true end end
#do_close (private)
Implementation for #close, assumes the connect lock is already acquired.
# File 'lib/mongo/client.rb', line 1292
def do_close @cluster.close close_encrypter end
#encrypted_fields_map ⇒ Hash
| nil
Returns encrypted field map hash if provided when creating the client.
# File 'lib/mongo/client.rb', line 1178
def encrypted_fields_map @encrypted_fields_map ||= @options.fetch(:, {})[:encrypted_fields_map] end
#eql?(other)
Alias for #==.
# File 'lib/mongo/client.rb', line 182
alias_method :eql?, :==
#get_session(options = {}) ⇒ Session | nil
Returns a session to use for operations if possible.
If :session
option is set, validates that session and returns it. Otherwise, if deployment supports sessions, creates a new session and returns it. When a new session is created, the session will be implicit (lifecycle is managed by the driver) if the :implicit
option is given, otherwise the session will be explicit (lifecycle managed by the application). If deployment does not support session, returns nil.
# File 'lib/mongo/client.rb', line 1118
def get_session( = {}) get_session!( ) rescue Error::SessionsNotSupported nil end
#get_session!(options = {}) ⇒ Session (private)
Returns a session to use for operations.
If :session
option is set, validates that session and returns it. Otherwise, if deployment supports sessions, creates a new session and returns it. When a new session is created, the session will be implicit (lifecycle is managed by the driver) if the :implicit
option is given, otherwise the session will be explicit (lifecycle managed by the application). If deployment does not support session, raises Error::InvalidSession
.
# File 'lib/mongo/client.rb', line 1319
def get_session!( = {}) if [:session] return [:session].validate!(self) end cluster.validate_session_support!(timeout: timeout_sec) = {implicit: true}.update( ) server_session = if [:implicit] nil else cluster.session_pool.checkout end Session.new(server_session, self, ) end
#hash ⇒ Integer
Get the hash value of the client.
# File 'lib/mongo/client.rb', line 207
def hash [cluster, ].hash end
#initialize_copy(original) (private)
Auxiliary method that is called by interpreter when copying the client via dup or clone.
# File 'lib/mongo/client.rb', line 1343
def initialize_copy(original) @options = original. .dup @connect_lock = Mutex.new @monitoring = @cluster ? monitoring : Monitoring.new( ) @database = nil @read_preference = nil @write_concern = nil end
#inspect ⇒ String
Get an inspection of the client as a string.
#list_databases(filter = {}, name_only = false, opts = {}) ⇒ Array
<Hash
>
Get info for each database.
See https://mongodb.com/docs/manual/reference/command/listDatabases/
for more information and usage.
# File 'lib/mongo/client.rb', line 963
def list_databases(filter = {}, name_only = false, opts = {}) cmd = { listDatabases: 1 } cmd[:nameOnly] = !!name_only cmd[:filter] = filter unless filter.empty? cmd[: ] = true if opts[: ] use(Database::ADMIN).database.read_command(cmd, opts).first[Database::DATABASES] end
#list_mongo_databases(filter = {}, opts = {}) ⇒ Array
<Mongo::Database>
Returns a list of Database
objects.
# File 'lib/mongo/client.rb', line 986
def list_mongo_databases(filter = {}, opts = {}) database_names(filter, opts).collect do |name| Database.new(self, name, ) end end
#max_read_retries ⇒ Integer
Get the maximum number of times the client can retry a read operation when using legacy read retries.
# File 'lib/mongo/client.rb', line 642
def max_read_retries [:max_read_retries] || Cluster::MAX_READ_RETRIES end
#max_write_retries ⇒ Integer
Get the maximum number of times the client can retry a write operation when using legacy write retries.
# File 'lib/mongo/client.rb', line 662
def max_write_retries [:max_write_retries] || Cluster::MAX_WRITE_RETRIES end
#monitoring ⇒ Monitoring (private)
#process_addresses(addresses, options) ⇒ Hash
<:uri
, :addresses
, :options
> (private)
Attempts to parse the given list of addresses, using the provided options.
# File 'lib/mongo/client.rb', line 1208
def process_addresses(addresses, ) if addresses.is_a?(String) process_addresses_string(addresses, ) else process_addresses_array(addresses, ) end end
#process_addresses_array(addresses, options) ⇒ Hash
<:uri
, :addresses
, :options
> (private)
Attempts to parse the given list of addresses, using the provided options.
# File 'lib/mongo/client.rb', line 1252
def process_addresses_array(addresses, ) {}.tap do |processed| processed[:addresses] = addresses processed[: ] = addresses.each do |addr| if addr =~ /\Amongodb(\+srv)?:\/\//i raise ArgumentError, "Host '#{addr}' should not contain protocol. Did you mean to not use an array?" end end @srv_records = nil end end
#process_addresses_string(addresses, options) ⇒ Hash
<:uri
, :addresses
, :options
> (private)
Attempts to parse the given list of addresses, using the provided options.
# File 'lib/mongo/client.rb', line 1224
def process_addresses_string(addresses, ) {}.tap do |processed| processed[:uri] = uri = URI.get(addresses, ) processed[:addresses] = uri.servers = uri. .dup # Special handing for :write and :write_concern: allow client Ruby # options to override URI options, even when the Ruby option uses the # deprecated :write key and the URI option uses the current # :write_concern key if [:write] .delete(:write_concern) end processed[: ] = .merge( ) @srv_records = uri.srv_records end end
#read_concern ⇒ Hash
Get the read concern for this client.
# File 'lib/mongo/client.rb', line 839
def read_concern [:read_concern] end
#read_preference ⇒ BSON::Document
Get the read preference from the options passed to the client.
# File 'lib/mongo/client.rb', line 722
def read_preference @read_preference ||= [:read] end
#read_retry_interval ⇒ Float
Get the interval, in seconds, in which read retries when using legacy read retries.
# File 'lib/mongo/client.rb', line 652
def read_retry_interval [:read_retry_interval] || Cluster::READ_RETRY_INTERVAL end
#reconnect ⇒ true
Reconnect the client.
# File 'lib/mongo/client.rb', line 890
def reconnect addresses = cluster.addresses.map(&:to_s) @connect_lock.synchronize do do_close rescue nil @cluster = Cluster.new(addresses, monitoring, ) if @options[: ] build_encrypter end @closed = false end true end
#server_selector ⇒ Mongo::ServerSelector
Get the server selector. It either uses the read preference defined in the client options or defaults to a Primary server selector.
# File 'lib/mongo/client.rb', line 700
def server_selector @server_selector ||= if read_preference ServerSelector.get(read_preference) else ServerSelector.primary end end
#start_session(options = {}) ⇒ Session
A Session cannot be used by multiple threads at once; session objects are not thread-safe.
Start a session.
If the deployment does not support sessions, raises Mongo::Error::InvalidSession. This exception can also be raised when the driver is not connected to a data-bearing server, for example during failover.
# File 'lib/mongo/client.rb', line 1011
def start_session( = {}) session = get_session!( .merge(implicit: false)) if block_given? begin yield session ensure session.end_session end else session end end
#summary ⇒ String
The exact format and layout of the returned summary string is not part of the driver’s public API and may be changed at any time.
Get a summary of the client state.
# File 'lib/mongo/client.rb', line 686
def summary "#<Client cluster=#{cluster.summary}>" end
#timeout_ms ⇒ Integer
| nil
# File 'lib/mongo/client.rb', line 1184
def timeout_ms @options[:timeout_ms] end
#timeout_sec ⇒ Float
| nil
# File 'lib/mongo/client.rb', line 1190
def timeout_sec if timeout_ms.nil? nil else timeout_ms / 1_000.0 end end
#update_options(new_options) ⇒ Hash
Updates this client’s options from new_options, validating all options.
The new options may be transformed according to various rules. The final hash of options actually applied to the client is returned.
If options fail validation, this method may warn or raise an exception. If this method raises an exception, the client should be discarded (similarly to if a constructor raised an exception).
# File 'lib/mongo/client.rb', line 789
def ( ) = @options = self.class. ( || {}) ( ).tap do |opts| # Our options are frozen = @options.dup if [:write] && opts[:write_concern] .delete(:write) end if [:write_concern] && opts[:write] .delete(:write_concern) end .update(opts) @options = .freeze = @options[: ] != [: ] # If there are new auto_encryption_options, create a new encrypter. # Otherwise, allow the new client to share an encrypter with the # original client. # # If auto_encryption_options are nil, set @encrypter to nil, but do not # close the encrypter because it may still be used by the original client. if @options[: ] && @connect_lock.synchronize do build_encrypter end elsif @options[: ].nil? @connect_lock.synchronize do @encrypter = nil end end end end
#use(name) ⇒ Client
The new client shares the cluster with the original client, and as a result also shares the monitoring instance and monitoring event subscribers.
Creates a new client configured to use the database with the provided name, and using the other options configured in this client.
# File 'lib/mongo/client.rb', line 741
def use(name) with(database: name) end
#valid_compressors(compressors) (private)
# File 'lib/mongo/client.rb', line 1629
def valid_compressors(compressors) compressors.select do |compressor| if !VALID_COMPRESSORS.include?(compressor) log_warn("Unsupported compressor '#{compressor}' in list '#{compressors}'. " + "This compressor will not be used.") false else true end end end
#validate_authentication_options! (private)
Validates all authentication-related options after they are set on the client This method is intended to catch combinations of options which are not allowed
# File 'lib/mongo/client.rb', line 1574
def auth_mech = [:auth_mech] user = [:user] password = [:password] auth_source = [:auth_source] mech_properties = [:auth_mech_properties] if auth_mech.nil? if user && user.empty? raise Mongo::Auth::InvalidConfiguration, 'Empty username is not supported for default auth mechanism' end if auth_source == '' raise Mongo::Auth::InvalidConfiguration, 'Auth source cannot be empty for default auth mechanism' end return end if !Mongo::Auth::SOURCES.key?(auth_mech) raise Mongo::Auth::InvalidMechanism.new(auth_mech) end if user.nil? && !%i(aws mongodb_x509).include?(auth_mech) raise Mongo::Auth::InvalidConfiguration, "Username is required for auth mechanism #{auth_mech}" end if password.nil? && !%i(aws gssapi mongodb_x509).include?(auth_mech) raise Mongo::Auth::InvalidConfiguration, "Password is required for auth mechanism #{auth_mech}" end if password && auth_mech == :mongodb_x509 raise Mongo::Auth::InvalidConfiguration, 'Password is not supported for :mongodb_x509 auth mechanism' end if auth_mech == :aws && user && !password raise Mongo::Auth::InvalidConfiguration, 'Username is provided but password is not provided for :aws auth mechanism' end if %i(aws gssapi mongodb_x509).include?(auth_mech) if !['$external', nil].include?(auth_source) raise Mongo::Auth::InvalidConfiguration, "#{auth_source} is an invalid auth source for #{auth_mech}; valid options are $external and nil" end else # Auth source is the database name, and thus cannot be the empty string. if auth_source == '' raise Mongo::Auth::InvalidConfiguration, "Auth source cannot be empty for auth mechanism #{auth_mech}" end end if mech_properties && !%i(aws gssapi).include?(auth_mech) raise Mongo::Auth::InvalidConfiguration, ":mechanism_properties are not supported for auth mechanism #{auth_mech}" end end
#validate_max_connecting!(option, opts) ⇒ true
(private)
Validates whether the max_connecting option is valid.
# File 'lib/mongo/client.rb', line 1677
def validate_max_connecting!(option, opts) if option == :max_connecting && opts.key?(:max_connecting) max_connecting = opts[:max_connecting] || Server::ConnectionPool::DEFAULT_MAX_CONNECTING if max_connecting <= 0 raise Error::InvalidMaxConnecting.new(opts[:max_connecting]) end end true end
#validate_max_min_pool_size!(option, opts) (private)
# File 'lib/mongo/client.rb', line 1660
def validate_max_min_pool_size!(option, opts) if option == :min_pool_size && opts[:min_pool_size] max = opts[:max_pool_size] || Server::ConnectionPool::DEFAULT_MAX_SIZE if max != 0 && opts[:min_pool_size] > max raise Error::InvalidMinPoolSize.new(opts[:min_pool_size], max) end end true end
#validate_new_options!(opts) (private)
Validates options in the provided argument for validity. The argument may contain a subset of options that the client will eventually have; this method validates each of the provided options but does not check for interactions between combinations of options.
# File 'lib/mongo/client.rb', line 1365
def (opts) return Options::Redacted.new unless opts if opts[:read_concern] # Raise an error for non user-settable options if opts[:read_concern][:after_cluster_time] raise Mongo::Error::InvalidReadConcern.new( 'The after_cluster_time read_concern option cannot be specified by the user' ) end given_keys = opts[:read_concern].keys.map(&:to_s) allowed_keys = ['level'] invalid_keys = given_keys - allowed_keys # Warn that options are invalid but keep it and forward to the server unless invalid_keys.empty? log_warn("Read concern has invalid keys: #{invalid_keys.join(',')}.") end end if server_api = opts[:server_api] unless server_api.is_a?(Hash) raise ArgumentError, ":server_api value must be a hash: #{server_api}" end extra_keys = server_api.keys - %w(version strict deprecation_errors) unless extra_keys.empty? raise ArgumentError, "Unknown keys under :server_api: #{extra_keys.map(&:inspect).join(', ')}" end if version = server_api[:version] unless VALID_SERVER_API_VERSIONS.include?(version) raise ArgumentError, "Unknown server API version: #{version}" end end end Lint.validate_underscore_read_preference(opts[:read]) Lint.validate_read_concern_option(opts[:read_concern]) opts.each.inject(Options::Redacted.new) do |, (k, v)| key = k.to_sym if VALID_OPTIONS.include?(key) validate_max_min_pool_size!(key, opts) validate_max_connecting!(key, opts) validate_read!(key, opts) if key == :compressors compressors = valid_compressors(v) if compressors.include?('snappy') validate_snappy_compression! end if compressors.include?('zstd') validate_zstd_compression! end [key] = compressors unless compressors.empty? elsif key == :srv_max_hosts if v && (!v.is_a?(Integer) || v < 0) log_warn("#{v} is not a valid integer for srv_max_hosts") else [key] = v end else [key] = v end else log_warn("Unsupported client option '#{k}'. It will be ignored.") end end end
#validate_options!(addresses = nil, is_srv: nil) (private)
Validates all options after they are set on the client. This method is intended to catch combinations of options which are not allowed.
# File 'lib/mongo/client.rb', line 1440
def (addresses = nil, is_srv: nil) if [:write] && [:write_concern] && [:write] != [:write_concern] raise ArgumentError, "If :write and :write_concern are both given, they must be identical: #{ .inspect}" end connect = [:connect]&.to_sym if connect && !%i(direct replica_set sharded load_balanced).include?(connect) raise ArgumentError, "Invalid :connect option value: #{connect}" end if [:direct_connection] if connect && connect != :direct raise ArgumentError, "Conflicting client options: direct_connection=true and connect=#{connect}" end # When a new client is created, we get the list of seed addresses if addresses && addresses.length > 1 raise ArgumentError, "direct_connection=true cannot be used with multiple seeds" end # When a client is copied using #with, we have a cluster if cluster && !cluster.topology.is_a?(Mongo::Cluster::Topology::Single) raise ArgumentError, "direct_connection=true cannot be used with topologies other than Single (this client is #{cluster.topology.class.name.sub(/.*::/, '')})" end end if [:load_balanced] if addresses && addresses.length > 1 raise ArgumentError, "load_balanced=true cannot be used with multiple seeds" end if [:direct_connection] raise ArgumentError, "direct_connection=true cannot be used with load_balanced=true" end if connect && connect != :load_balanced raise ArgumentError, "connect=#{connect} cannot be used with load_balanced=true" end if [:replica_set] raise ArgumentError, "load_balanced=true cannot be used with replica_set option" end end if connect == :load_balanced if addresses && addresses.length > 1 raise ArgumentError, "connect=load_balanced cannot be used with multiple seeds" end if [:replica_set] raise ArgumentError, "connect=load_balanced cannot be used with replica_set option" end end if [:direct_connection] == false && connect && connect == :direct raise ArgumentError, "Conflicting client options: direct_connection=false and connect=#{connect}" end %i(connect_timeout socket_timeout).each do |key| if value = [key] unless Numeric === value raise ArgumentError, "#{key} must be a non-negative number: #{value}" end if value < 0 raise ArgumentError, "#{key} must be a non-negative number: #{value}" end end end if value = [:bg_error_backtrace] case value when Integer if value <= 0 raise ArgumentError, ":bg_error_backtrace option value must be true, false, nil or a positive integer: #{value}" end when true # OK else raise ArgumentError, ":bg_error_backtrace option value must be true, false, nil or a positive integer: #{value}" end end if libraries = [:wrapping_libraries] unless Array === libraries raise ArgumentError, ":wrapping_libraries must be an array of hashes: #{libraries}" end libraries = libraries.map do |library| Utils.shallow_symbolize_keys(library) end libraries.each do |library| unless Hash === library raise ArgumentError, ":wrapping_libraries element is not a hash: #{library}" end if library.empty? raise ArgumentError, ":wrapping_libraries element is empty" end unless (library.keys - %i(name platform version)).empty? raise ArgumentError, ":wrapping_libraries element has invalid keys (allowed keys: :name, :platform, :version): #{library}" end library.each do |key, value| if value.include?('|') raise ArgumentError, ":wrapping_libraries element value cannot include '|': #{value}" end end end end if [:srv_max_hosts] && [:srv_max_hosts] > 0 if [:replica_set] raise ArgumentError, ":srv_max_hosts > 0 cannot be used with :replica_set option" end if [:load_balanced] raise ArgumentError, ":srv_max_hosts > 0 cannot be used with :load_balanced=true" end end unless is_srv.nil? || is_srv if [:srv_max_hosts] raise ArgumentError, ":srv_max_hosts cannot be used on non-SRV URI" end if [:srv_service_name] raise ArgumentError, ":srv_service_name cannot be used on non-SRV URI" end end end
#validate_read!(option, opts) (private)
# File 'lib/mongo/client.rb', line 1687
def validate_read!(option, opts) if option == :read && opts.has_key?(:read) read = opts[:read] # We could check if read is a Hash, but this would fail # for custom classes implementing key access ([]). # Instead reject common cases of strings and symbols. if read.is_a?(String) || read.is_a?(Symbol) raise Error::InvalidReadOption.new(read, "the read preference must be specified as a hash: { mode: #{read.inspect} }") end if mode = read[:mode] mode = mode.to_sym unless Mongo::ServerSelector::PREFERENCES.include?(mode) raise Error::InvalidReadOption.new(read, "mode #{mode} is not one of recognized modes") end end end true end
#validate_snappy_compression! (private)
# File 'lib/mongo/client.rb', line 1642
def validate_snappy_compression! return if defined?(Snappy) require 'snappy' rescue LoadError => e raise Error::UnmetDependency, "Cannot enable snappy compression because the snappy gem " \ "has not been installed. Add \"gem 'snappy'\" to your Gemfile and run " \ "\"bundle install\" to install the gem. (#{e.class}: #{e})" end
#validate_zstd_compression! (private)
# File 'lib/mongo/client.rb', line 1651
def validate_zstd_compression! return if defined?(Zstd) require 'zstd-ruby' rescue LoadError => e raise Error::UnmetDependency, "Cannot enable zstd compression because the zstd-ruby gem " \ "has not been installed. Add \"gem 'zstd-ruby'\" to your Gemfile and run " \ "\"bundle install\" to install the gem. (#{e.class}: #{e})" end
#watch(pipeline = [], options = {}) ⇒ ChangeStream
A change stream only allows ‘majority’ read concern.
This helper method is preferable to running a raw aggregation with a $changeStream stage, for the purpose of supporting resumability.
As of version 3.6 of the MongoDB server, a “$changeStream“ pipeline stage is supported in the aggregation framework. As of version 4.0, this stage allows users to request that notifications are sent for all changes that occur in the client’s cluster.
# File 'lib/mongo/client.rb', line 1088
def watch(pipeline = [], = {}) return use(Database::ADMIN).watch(pipeline, ) unless database.name == Database::ADMIN = .dup [:cursor_type] = :tailable_await if [:max_await_time_ms] Mongo::Collection::View::ChangeStream.new( Mongo::Collection::View.new(self["#{Database::COMMAND}.aggregate"], {}, ), pipeline, Mongo::Collection::View::ChangeStream::CLUSTER, ) end
#with(new_options = nil) ⇒ Client
Depending on options given, the returned client may share the cluster with the original client or be created with a new cluster. If a new cluster is created, the monitoring event subscribers on the new client are set to the default event subscriber set and none of the subscribers on the original client are copied over.
Creates a new client with the passed options merged over the existing options of this client. Useful for one-offs to change specific options without altering the original client.
# File 'lib/mongo/client.rb', line 763
def with( = nil) clone.tap do |client| opts = client. ( || Options::Redacted.new) Database.create(client) # We can't use the same cluster if some options that would affect it # have changed. if (opts) Cluster.create(client, monitoring: opts[:monitoring]) end end end
#with_session(options = {}, &block)
Creates a session to use for operations if possible and yields it to the provided block.
If :session
option is set, validates that session and uses it. Otherwise, if deployment supports sessions, creates a new session and uses it. When a new session is created, the session will be implicit (lifecycle is managed by the driver) if the :implicit
option is given, otherwise the session will be explicit (lifecycle managed by the application). If deployment does not support session, yields nil to the block.
When the block finishes, if the session was created and was implicit, or if an implicit session was passed in, the session is ended which returns it to the pool of available sessions.
# File 'lib/mongo/client.rb', line 1144
def with_session( = {}, &block) # TODO: Add this back in RUBY-3174. # assert_not_closed session = get_session( ) yield session ensure if session && session.implicit? session.end_session end end
#write_concern ⇒ Mongo::WriteConcern
Get the write concern for this client. If no option was provided, then a default single server acknowledgement will be used.
# File 'lib/mongo/client.rb', line 852
def write_concern @write_concern ||= WriteConcern.get( [:write_concern] || [:write]) end