123456789_123456789_123456789_123456789_123456789_

Class: Mongo::ServerSelector::Base

Relationships & Source Files
Extension / Inclusion / Inheritance Descendants
Subclasses:
Inherits: Object
Defined in: lib/mongo/server_selector/base.rb

Overview

Since:

  • 2.0.0

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Instance Attribute Details

#hedgeHash | nil (readonly)

Returns:

  • (Hash | nil)

    hedge The document specifying whether to enable hedged reads.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 76

attr_reader :hedge

#max_stalenessInteger (readonly)

Returns:

  • (Integer)

    max_staleness The maximum replication lag, in seconds, that a secondary can suffer and still be eligible for a read.

Since:

  • 2.4.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 72

attr_reader :max_staleness

#optionsHash (readonly)

Returns:

  • (Hash)

    options The options.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 63

attr_reader :options

#tag_setsArray (readonly)

Returns:

  • (Array)

    tag_sets The tag sets used to select servers.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 66

attr_reader :tag_sets

Instance Method Details

#==(other) ⇒ true, false

Check equality of two server selectors.

Examples:

Check server selector equality.

preference == other

Parameters:

  • other (Object)

    The other preference.

Returns:

  • (true, false)

    Whether the objects are equal.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 136

def ==(other)
  name == other.name && hedge == other.hedge &&
    max_staleness == other.max_staleness && tag_sets == other.tag_sets
end

#candidates(cluster) ⇒ Array<Server>

This method is for internal use only.

Returns servers of acceptable types from the cluster.

Does not perform staleness validation, staleness filtering or latency filtering.

Parameters:

  • cluster (Cluster)

    The cluster.

Returns:

  • (Array<Server>)

    The candidate servers.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 403

def candidates(cluster)
  servers = cluster.servers
  servers.each do |server|
    validate_max_staleness_support!(server)
  end
  if cluster.single?
    servers
  elsif cluster.sharded?
    servers
  elsif cluster.replica_set?
    select_in_replica_set(servers)
  else
    # Unknown cluster - no servers
    []
  end
end

#filter_stale_servers(candidates, primary = nil) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 572

def filter_stale_servers(candidates, primary = nil)
  return candidates unless @max_staleness

  # last_scan is filled out by the Monitor, and can be nil if a server
  # had its description manually set rather than being normally updated
  # via the SDAM flow. We don't handle the possibility of a nil
  # last_scan here.
  if primary
    candidates.select do |server|
      validate_max_staleness_support!(server)
      staleness = (server.last_scan - server.last_write_date) -
                  (primary.last_scan - primary.last_write_date)  +
                  server.cluster.heartbeat_interval
      staleness <= @max_staleness
    end
  else
    max_write_date = candidates.collect(&:last_write_date).max
    candidates.select do |server|
      validate_max_staleness_support!(server)
      staleness = max_write_date - server.last_write_date + server.cluster.heartbeat_interval
      staleness <= @max_staleness
    end
  end
end

#full_docHash (private)

Convert this server preference definition into a format appropriate

for sending to a MongoDB server (i.e., as a command field).

Returns:

  • (Hash)

    The server preference formatted as a command field value.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 469

def full_doc
  @full_doc ||= begin
    preference = { :mode => self.class.const_get(:SERVER_FORMATTED_NAME) }
    preference.update(tags: tag_sets) unless tag_sets.empty?
    preference.update(maxStalenessSeconds: max_staleness) if max_staleness
    preference.update(hedge: hedge) if hedge
    preference
  end
end

#inspectString

Inspect the server selector.

Examples:

Inspect the server selector.

selector.inspect

Returns:

  • (String)

    The inspection.

Since:

  • 2.2.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 122

def inspect
  "#<#{self.class.name}:0x#{object_id} tag_sets=#{tag_sets.inspect} max_staleness=#{max_staleness.inspect} hedge=#{hedge}>"
end

#local_thresholdFloat

Deprecated.

This setting is now taken from the cluster options when a server is selected. Will be removed in version 3.0.

Get the local threshold boundary for nearest selection in seconds.

Examples:

Get the local threshold.

selector.local_threshold

Returns:

  • (Float)

    The local threshold.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 105

def local_threshold
  @local_threshold ||= (options[:local_threshold] || ServerSelector::LOCAL_THRESHOLD)
end

#local_threshold_with_cluster(cluster)

This method is for internal use only.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 110

def local_threshold_with_cluster(cluster)
  options[:local_threshold] || cluster.options[:local_threshold] || LOCAL_THRESHOLD
end

#match_tag_sets(candidates) ⇒ Array (private)

Select the servers matching the defined tag sets.

Parameters:

  • candidates (Array)

    List of candidate servers from which those matching the defined tag sets should be selected.

Returns:

  • (Array)

    The servers matching the defined tag sets.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 563

def match_tag_sets(candidates)
  matches = []
  tag_sets.find do |tag_set|
    matches = candidates.select { |server| server.matches_tag_set?(tag_set) }
    !matches.empty?
  end
  matches || []
end

#near_servers(candidates = [], local_threshold = nil) ⇒ Array (private)

Select the near servers from a list of provided candidates, taking the

local threshold into .

Parameters:

  • candidates (Array) (defaults to: [])

    List of candidate servers to select the near servers from.

  • local_threshold (Integer) (defaults to: nil)

    Local threshold. This parameter will be required in driver version 3.0.

Returns:

  • (Array)

    The near servers.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 523

def near_servers(candidates = [], local_threshold = nil)
  return candidates if candidates.empty?

  # Average RTT on any server may change at any time by the server
  # monitor's background thread. ARTT may also become nil if the
  # server is marked unknown. Take a snapshot of ARTTs for the duration
  # of this method.

  candidates = candidates.map do |server|
    {server: server, artt: server.average_round_trip_time}
  end.reject do |candidate|
    candidate[:artt].nil?
  end

  return candidates if candidates.empty?

  nearest_candidate = candidates.min_by do |candidate|
    candidate[:artt]
  end

  # Default for legacy signarure
  local_threshold ||= self.local_threshold

  threshold = nearest_candidate[:artt] + local_threshold

  candidates.select do |candidate|
    candidate[:artt] <= threshold
  end.map do |candidate|
    candidate[:server]
  end.shuffle!
end

#primary(candidates) ⇒ Array (private)

Select the primary from a list of provided candidates.

Parameters:

  • candidates (Array)

    List of candidate servers to select the primary from.

Returns:

  • (Array)

    The primary.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 487

def primary(candidates)
  candidates.select do |server|
    server.primary?
  end
end

#secondaries(candidates) ⇒ Array (private)

Select the secondaries from a list of provided candidates.

Parameters:

  • candidates (Array)

    List of candidate servers to select the secondaries from.

Returns:

  • (Array)

    The secondary servers.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 501

def secondaries(candidates)
  matching_servers = candidates.select(&:secondary?)
  matching_servers = filter_stale_servers(matching_servers, primary(candidates).first)
  matching_servers = match_tag_sets(matching_servers) unless tag_sets.empty?
  # Per server selection spec the server selected MUST be a random
  # one matching staleness and latency requirements.
  # Selectors always pass the output of #secondaries to #nearest
  # which shuffles the server list, fulfilling this requirement.
  matching_servers
end

#select_server(cluster, ping = nil, session = nil, write_aggregation: false, deprioritized: [], timeout: nil) ⇒ Mongo::Server

Select a server from the specified cluster, taking into account mongos pinning for the specified session.

If the session is given and has a pinned server, this server is the only server considered for selection. If the server is of type mongos, it is returned immediately; otherwise monitoring checks on this server are initiated to update its status, and if the server becomes a mongos within the server selection timeout, it is returned.

If no session is given or the session does not have a pinned server, normal server selection process is performed among all servers in the specified cluster matching the preference of this server selector object. ::Mongo::Monitoring checks are initiated on servers in the cluster until a suitable server is found, up to the server selection timeout.

If a suitable server is not found within the server selection timeout, this method raises ::Mongo::Error::NoServerAvailable.

Parameters:

  • cluster (Mongo::Cluster)

    The cluster from which to select an eligible server.

  • ping (true, false) (defaults to: nil)

    Whether to ping the server before selection. Deprecated and ignored.

  • session (Session | nil) (defaults to: nil)

    Optional session to take into account for mongos pinning. Added in version 2.10.0.

  • write_aggregation (true | false)

    Whether we need a server that supports writing aggregations (e.g. with $merge/$out) on secondaries.

  • deprioritized (Array<Server>)

    A list of servers that should be selected from only if no other servers are available. This is used to avoid selecting the same server twice in a row when retrying a command.

  • :timeout (Float | nil)

    ::Mongo::Timeout in seconds for the operation, if any.

Returns:

Raises:

  • (Error::NoServerAvailable)

    No server was found matching the specified preference / pinning requirement in the server selection timeout.

  • (Error::LintError)

    An unexpected condition was detected, and lint mode is enabled.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 183

def select_server(
  cluster,
  ping = nil,
  session = nil,
  write_aggregation: false,
  deprioritized: [],
  timeout: nil
)
  select_server_impl(cluster, ping, session, write_aggregation, deprioritized, timeout).tap do |server|
    if Lint.enabled? && !server.pool.ready?
      raise Error::LintError, 'Server selector returning a server with a pool which is not ready'
    end
  end
end

#select_server_impl(cluster, ping, session, write_aggregation, deprioritized, csot_timeout) (private)

Parameters and return values are the same as for select_server, only the timeout param is renamed to csot_timeout.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 200

private def select_server_impl(cluster, ping, session, write_aggregation, deprioritized, csot_timeout)
  if cluster.topology.is_a?(Cluster::Topology::LoadBalanced)
    return cluster.servers.first
  end

  timeout = cluster.options[:server_selection_timeout] || SERVER_SELECTION_TIMEOUT

  server_selection_timeout = if csot_timeout && csot_timeout > 0
                               [timeout, csot_timeout].min
                             else
                               timeout
                             end

  # Special handling for zero timeout: if we have to select a server,
  # and the timeout is zero, fail immediately (since server selection
  # will take some non-zero amount of time in any case).
  if server_selection_timeout == 0
    msg = "Failing server selection due to zero timeout. " +
      " Requested #{name} in cluster: #{cluster.summary}"
    raise Error::NoServerAvailable.new(self, cluster, msg)
  end

  deadline = Utils.monotonic_time + server_selection_timeout

  if session && session.pinned_server
    if Mongo::Lint.enabled?
      unless cluster.sharded?
        raise Error::LintError, "Session has a pinned server in a non-sharded topology: #{topology}"
      end
    end

    if !session.in_transaction?
      session.unpin
    end

    if server = session.pinned_server
      # Here we assume that a mongos stays in the topology indefinitely.
      # This will no longer be the case once SRV polling is implemented.

      unless server.mongos?
        while (time_remaining = deadline - Utils.monotonic_time) > 0
          wait_for_server_selection(cluster, time_remaining)
        end

        unless server.mongos?
          msg = "The session being used is pinned to the server which is not a mongos: #{server.summary} " +
            "(after #{server_selection_timeout} seconds)"
          raise Error::NoServerAvailable.new(self, cluster, msg)
        end
      end

      return server
    end
  end

  if cluster.replica_set?
    validate_max_staleness_value_early!
  end

  if cluster.addresses.empty?
    if Lint.enabled?
      unless cluster.servers.empty?
        raise Error::LintError, "Cluster has no addresses but has servers: #{cluster.servers.map(&:inspect).join(', ')}"
      end
    end
    msg = "Cluster has no addresses, and therefore will never have a server"
    raise Error::NoServerAvailable.new(self, cluster, msg)
  end

=begin Add this check in version 3.0.0
  unless cluster.connected?
    msg = 'Cluster is disconnected'
    raise Error::NoServerAvailable.new(self, cluster, msg)
  end
=end

  loop do
    if Lint.enabled?
      cluster.servers.each do |server|
        # TODO: Add this back in RUBY-3174.
        # if !server.unknown? && !server.connected?
        #   raise Error::LintError, "Server #{server.summary} is known but is not connected"
        # end
        if !server.unknown? && !server.pool.ready?
          raise Error::LintError, "Server #{server.summary} is known but has non-ready pool"
        end
      end
    end

    server = try_select_server(cluster, write_aggregation: write_aggregation, deprioritized: deprioritized)

    if server
      unless cluster.topology.compatible?
        raise Error::UnsupportedFeatures, cluster.topology.compatibility_error.to_s
      end

      if session && session.starting_transaction? && cluster.sharded?
        session.pin_to_server(server)
      end

      return server
    end

    cluster.scan!(false)

    time_remaining = deadline - Utils.monotonic_time
    if time_remaining > 0
      wait_for_server_selection(cluster, time_remaining)

      # If we wait for server selection, perform another round of
      # attempting to locate a suitable server. Otherwise server selection
      # can raise NoServerAvailable message when the diagnostics
      # reports an available server of the requested type.
    else
      break
    end
  end

  msg = "No #{name} server"
  if is_a?(ServerSelector::Secondary) && !tag_sets.empty?
    msg += " with tag sets: #{tag_sets}"
  end
  msg += " is available in cluster: #{cluster.summary} " +
          "with timeout=#{server_selection_timeout}, " +
          "LT=#{local_threshold_with_cluster(cluster)}"
  msg += server_selection_diagnostic_message(cluster)
  raise Error::NoServerAvailable.new(self, cluster, msg)
rescue Error::NoServerAvailable => e
  if session && session.in_transaction? && !session.committing_transaction?
    e.add_label('TransientTransactionError')
  end
  if session && session.committing_transaction?
    e.add_label('UnknownTransactionCommitResult')
  end
  raise e
end

#server_selection_diagnostic_message(cluster) ⇒ String (private)

Creates a diagnostic message when server selection fails.

The diagnostic message includes the following information, as applicable:

If none of the conditions for diagnostic messages apply, an empty string is returned.

Parameters:

  • cluster (Cluster)

    The cluster on which server selection was performed.

Returns:

  • (String)

    The diagnostic message.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 695

def server_selection_diagnostic_message(cluster)
  msg = ''
  dead_monitors = []
  cluster.servers_list.each do |server|
    thread = server.monitor.instance_variable_get('@thread')
    if thread.nil? || !thread.alive?
      dead_monitors << server
    end
  end
  if dead_monitors.any?
    msg += ". The following servers have dead monitor threads: #{dead_monitors.map(&:summary).join(', ')}"
  end
  unless cluster.connected?
    msg += ". The cluster is disconnected (client may have been closed)"
  end
  msg
end

#server_selection_timeoutFloat

Deprecated.

This setting is now taken from the cluster options when a server is selected. Will be removed in version 3.0.

Get the timeout for server selection.

Examples:

Get the server selection timeout, in seconds.

selector.server_selection_timeout

Returns:

  • (Float)

    The timeout.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 89

def server_selection_timeout
  @server_selection_timeout ||=
    (options[:server_selection_timeout] || ServerSelector::SERVER_SELECTION_TIMEOUT)
end

#suitable_server(servers, deprioritized) ⇒ Server | nil (private)

Returns a server from the list of servers that is suitable for executing the operation.

Parameters:

  • servers (Array<Server>)

    The candidate servers.

  • deprioritized (Array<Server>)

    A list of servers that should be selected from only if no other servers are available.

Returns:

  • (Server | nil)

    The suitable server or nil if no suitable server is available.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 454

def suitable_server(servers, deprioritized)
  preferred = servers - deprioritized
  if preferred.empty?
    servers.first
  else
    preferred.first
  end
end

#suitable_servers(cluster) ⇒ Array<Server>

This method is for internal use only.

Returns servers satisfying the server selector from the cluster.

Parameters:

  • cluster (Cluster)

    The cluster.

Returns:

  • (Array<Server>)

    The suitable servers.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 427

def suitable_servers(cluster)
  if cluster.single?
    candidates(cluster)
  elsif cluster.sharded?
    local_threshold = local_threshold_with_cluster(cluster)
    servers = candidates(cluster)
    near_servers(servers, local_threshold)
  elsif cluster.replica_set?
    validate_max_staleness_value!(cluster)
    candidates(cluster)
  else
    # Unknown cluster - no servers
    []
  end
end

#try_select_server(cluster, write_aggregation: false, deprioritized: []) ⇒ Server | nil

This method is for internal use only.

Tries to find a suitable server, returns the server if one is available or nil if there isn’t a suitable server.

Parameters:

  • cluster (Mongo::Cluster)

    The cluster from which to select an eligible server.

  • write_aggregation (true | false)

    Whether we need a server that supports writing aggregations (e.g. with $merge/$out) on secondaries.

  • deprioritized (Array<Server>)

    A list of servers that should be selected from only if no other servers are available. This is used to avoid selecting the same server twice in a row when retrying a command.

Returns:

  • (Server | nil)

    A suitable server, if one exists.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 352

def try_select_server(cluster, write_aggregation: false, deprioritized: [])
  servers = if write_aggregation && cluster.replica_set?
    # 1. Check if ALL servers in cluster support secondary writes.
    is_write_supported = cluster.servers.reduce(true) do |res, server|
      res && server.features.merge_out_on_secondary_enabled?
    end

    if is_write_supported
      # 2. If all servers support secondary writes, we respect read preference.
      suitable_servers(cluster)
    else
      # 3. Otherwise we fallback to primary for replica set.
      [cluster.servers.detect(&:primary?)]
    end
  else
    suitable_servers(cluster)
  end

  # This list of servers may be ordered in a specific way
  # by the selector (e.g. for secondary preferred, the first
  # server may be a secondary and the second server may be primary)
  # and we should take the first server here respecting the order
  server = suitable_server(servers, deprioritized)

  if server
    if Lint.enabled?
      # It is possible for a server to have a nil average RTT here
      # because the ARTT comes from description which may be updated
      # by a background thread while server selection is running.
      # Currently lint mode is not a public feature, if/when this
      # changes (https://jira.mongodb.org/browse/RUBY-1576) the
      # requirement for ARTT to be not nil would need to be removed.
      if server.average_round_trip_time.nil?
        raise Error::LintError, "Server #{server.address} has nil average rtt"
      end
    end
  end

  server
end

#validate! (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 597

def validate!
  if !@tag_sets.all? { |set| set.empty? } && !tags_allowed?
    raise Error::InvalidServerPreference.new(Error::InvalidServerPreference::NO_TAG_SUPPORT)
  elsif @max_staleness && !max_staleness_allowed?
    raise Error::InvalidServerPreference.new(Error::InvalidServerPreference::NO_MAX_STALENESS_SUPPORT)
  end

  if @hedge
    unless hedge_allowed?
      raise Error::InvalidServerPreference.new(Error::InvalidServerPreference::NO_HEDGE_SUPPORT)
    end

    unless @hedge.is_a?(Hash) && @hedge.key?(:enabled) &&
        [true, false].include?(@hedge[:enabled])
      raise Error::InvalidServerPreference.new(
        "`hedge` value (#{hedge}) is invalid - hedge must be a Hash in the " \
        "format { enabled: true }"
      )
    end
  end
end

#validate_max_staleness_support!(server) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 619

def validate_max_staleness_support!(server)
  if @max_staleness && !server.features.max_staleness_enabled?
    raise Error::InvalidServerPreference.new(Error::InvalidServerPreference::NO_MAX_STALENESS_WITH_LEGACY_SERVER)
  end
end

#validate_max_staleness_value!(cluster) (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 635

def validate_max_staleness_value!(cluster)
  if @max_staleness
    heartbeat_interval = cluster.heartbeat_interval
    unless @max_staleness >= [
      SMALLEST_MAX_STALENESS_SECONDS,
      min_cluster_staleness = heartbeat_interval + Cluster::IDLE_WRITE_PERIOD_SECONDS,
    ].max
      msg = "`max_staleness` value (#{@max_staleness}) is too small - it must be at least " +
        "`Mongo::ServerSelector::SMALLEST_MAX_STALENESS_SECONDS` (#{ServerSelector::SMALLEST_MAX_STALENESS_SECONDS}) and (the cluster's heartbeat_frequency " +
        "setting + `Mongo::Cluster::IDLE_WRITE_PERIOD_SECONDS`) (#{min_cluster_staleness})"
      raise Error::InvalidServerPreference.new(msg)
    end
  end
end

#validate_max_staleness_value_early! (private)

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 625

def validate_max_staleness_value_early!
  if @max_staleness
    unless @max_staleness >= SMALLEST_MAX_STALENESS_SECONDS
      msg = "`max_staleness` value (#{@max_staleness}) is too small - it must be at least " +
        "`Mongo::ServerSelector::SMALLEST_MAX_STALENESS_SECONDS` (#{ServerSelector::SMALLEST_MAX_STALENESS_SECONDS})"
      raise Error::InvalidServerPreference.new(msg)
    end
  end
end

#wait_for_server_selection(cluster, time_remaining) (private)

Waits for server state changes in the specified cluster.

If the cluster has a server selection semaphore, waits on that semaphore up to the specified remaining time. Any change in server state resulting from SDAM will immediately wake up this method and cause it to return.

If the cluster does not have a server selection semaphore, waits the smaller of 0.25 seconds and the specified remaining time. This functionality is provided for backwards compatibility only for applications directly invoking the server selection process. If lint mode is enabled and the cluster does not have a server selection semaphore, ::Mongo::Error::LintError will be raised.

Parameters:

  • cluster (Cluster)

    The cluster to wait for.

  • time_remaining (Numeric)

    Maximum time to wait, in seconds.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server_selector/base.rb', line 666

def wait_for_server_selection(cluster, time_remaining)
  if cluster.server_selection_semaphore
    # Since the semaphore may have been signaled between us checking
    # the servers list earlier and the wait call below, we should not
    # wait for the full remaining time - wait for up to 0.5 second, then
    # recheck the state.
    cluster.server_selection_semaphore.wait([time_remaining, 0.5].min)
  else
    if Lint.enabled?
      raise Error::LintError, 'Waiting for server selection without having a server selection semaphore'
    end
    sleep [time_remaining, 0.25].min
  end
end