123456789_123456789_123456789_123456789_123456789_

Class: Mongo::Server::PushMonitor Private

Do not use. This class is for internal use only.
Relationships & Source Files
Namespace Children
Classes:
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
self, Forwardable
Instance Chain:
Inherits: Object
Defined in: lib/mongo/server/push_monitor.rb,
lib/mongo/server/push_monitor/connection.rb

Overview

A monitor utilizing server-pushed hello requests.

When a Monitor handshakes with a 4.4+ server, it creates an instance of PushMonitor. PushMonitor subsequently executes server-pushed hello (i.e. awaited & exhausted hello) to receive topology changes from the server as quickly as possible. The Monitor still monitors the server for round-trip time calculations and to perform immediate checks as requested by the application.

Since:

  • 2.0.0

Constant Summary

::Mongo::Loggable - Included

PREFIX

Class Method Summary

Instance Attribute Summary

::Mongo::BackgroundThread - Included

Instance Method Summary

::Mongo::BackgroundThread - Included

#run!

Start the background thread.

#stop!

Stop the background thread and wait for to terminate for a reasonable amount of time.

#do_work

Override this method to do the work in the background thread.

#pre_stop

Override this method to perform additional signaling for the background thread to stop.

#start!,
#wait_for_stop

Waits for the thread to die, with a timeout.

::Mongo::Loggable - Included

#log_debug

Convenience method to log debug messages with the standard prefix.

#log_error

Convenience method to log error messages with the standard prefix.

#log_fatal

Convenience method to log fatal messages with the standard prefix.

#log_info

Convenience method to log info messages with the standard prefix.

#log_warn

Convenience method to log warn messages with the standard prefix.

#logger

Get the logger instance.

#_mongo_log_prefix, #format_message

Instance Attribute Details

#monitorServer (readonly)

Returns:

  • (Server)

    The server that is being monitored.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server/push_monitor.rb', line 55

attr_reader :monitor

#monitoringMonitoring (readonly)

Returns:

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server/push_monitor.rb', line 61

attr_reader :monitoring

#optionsHash (readonly)

Returns:

  • (Hash)

    Push monitor options.

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server/push_monitor.rb', line 64

attr_reader :options

#topology_versionTopologyVersion (readonly)

Returns:

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server/push_monitor.rb', line 58

attr_reader :topology_version

Instance Method Details

#check

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server/push_monitor.rb', line 137

def check
  @lock.synchronize do
    if @connection && @connection.pid != Process.pid
      log_warn("Detected PID change - Mongo client should have been reconnected (old pid #{@connection.pid}, new pid #{Process.pid}")
      @connection.disconnect!
      @connection = nil
    end
  end

  @lock.synchronize do
    unless @connection
      @server_pushing = false
      connection = PushMonitor::Connection.new(server.address, options)
      connection.connect!
      @connection = connection
    end
  end

  resp_msg = begin
    unless @server_pushing
      write_check_command
    end
    read_response
  rescue Mongo::Error
    @lock.synchronize do
      @connection.disconnect!
      @connection = nil
    end
    raise
  end
  @server_pushing = resp_msg.flags.include?(:more_to_come)
  result = Operation::Result.new(resp_msg)
  result.validate!
  result.documents.first
end

#do_work

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server/push_monitor.rb', line 94

def do_work
  @lock.synchronize do
    return if @stop_requested
  end

  result = monitoring.publish_heartbeat(server, awaited: true) do
    check
  end
  new_description = monitor.run_sdam_flow(result, awaited: true)
  # When hello fails due to a fail point, the response does not
  # include topology version. In this case we need to keep our existing
  # topology version so that we can resume monitoring.
  # The spec does not appear to directly address this case but
  # https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-monitoring.rst#streaming-ismaster
  # says that topologyVersion should only be updated from successful
  # hello responses.
  if new_description.topology_version
    @topology_version = new_description.topology_version
  end
rescue IOError, SocketError, SystemCallError, Mongo::Error => exc
  stop_requested = @lock.synchronize { @stop_requested }
  if stop_requested
    # Ignore the exception, see RUBY-2771.
    return
  end

  msg = "Error running awaited hello on #{server.address}"
  Utils.warn_bg_exception(msg, exc,
    logger: options[:logger],
    log_prefix: options[:log_prefix],
    bg_error_backtrace: options[:bg_error_backtrace],
  )

  # If a request failed on a connection, stop push monitoring.
  # In case the server is dead we don't want to have two connections
  # trying to connect unsuccessfully at the same time.
  stop!

  # Request an immediate check on the monitor to get reinstated as
  # soon as possible in case the server is actually alive.
  server.scan_semaphore.signal
end

#read_response

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server/push_monitor.rb', line 184

def read_response
  if timeout = options[:connect_timeout]
    if timeout < 0
      raise Mongo::SocketTimeoutError, "Requested to read with a negative timeout: #{}"
    elsif timeout > 0
      timeout += options[:heartbeat_frequency] || Monitor::DEFAULT_HEARTBEAT_INTERVAL
    end
  end
  # We set the timeout twice: once passed into read_socket which applies
  # to each individual read operation, and again around the entire read.
  Timeout.timeout(timeout, Error::SocketTimeoutError, "Failed to read an awaited hello response in #{timeout} seconds") do
    @lock.synchronize { @connection }.read_response(socket_timeout: timeout)
  end
end

#start!

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server/push_monitor.rb', line 69

def start!
  @lock.synchronize do
    super
  end
end

#stop!

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server/push_monitor.rb', line 75

def stop!
  @lock.synchronize do
    @stop_requested = true
    if @connection
      # Interrupt any in-progress exhausted hello reads by
      # disconnecting the connection.
      @connection.send(:socket).close rescue nil
    end
  end
  super.tap do
    @lock.synchronize do
      if @connection
        @connection.disconnect!
        @connection = nil
      end
    end
  end
end

#to_s

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server/push_monitor.rb', line 199

def to_s
  "#<#{self.class.name}:#{object_id} #{server.address}>"
end

#write_check_command

Since:

  • 2.0.0

[ GitHub ]

  
# File 'lib/mongo/server/push_monitor.rb', line 173

def write_check_command
  document = @check_document.merge(
    topologyVersion: topology_version.to_doc,
    maxAwaitTimeMS: monitor.heartbeat_interval * 1000,
  )
  command = Protocol::Msg.new(
    [:exhaust_allowed], {}, document.merge({'$db' => Database::ADMIN})
  )
  @lock.synchronize { @connection }.write_bytes(command.serialize.to_s)
end