Class: Mongo::Server::PushMonitor Private
Relationships & Source Files | |
Namespace Children | |
Classes:
| |
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
self,
Forwardable
|
|
Instance Chain:
|
|
Inherits: | Object |
Defined in: | lib/mongo/server/push_monitor.rb, lib/mongo/server/push_monitor/connection.rb |
Overview
A monitor utilizing server-pushed hello requests.
When a Monitor
handshakes with a 4.4+ server, it creates an instance of PushMonitor
. PushMonitor
subsequently executes server-pushed hello (i.e. awaited & exhausted hello) to receive topology changes from the server as quickly as possible. The Monitor still monitors the server for round-trip time calculations and to perform immediate checks as requested by the application.
Constant Summary
::Mongo::Loggable
- Included
Class Method Summary
- .new(monitor, topology_version, monitoring, **options) ⇒ PushMonitor constructor Internal use only
Instance Attribute Summary
- #monitor ⇒ Server readonly Internal use only
- #monitoring ⇒ Monitoring readonly Internal use only
- #options ⇒ Hash readonly Internal use only
- #topology_version ⇒ TopologyVersion readonly Internal use only
::Mongo::BackgroundThread
- Included
Instance Method Summary
- #check Internal use only
- #do_work Internal use only
- #read_response Internal use only
- #start! Internal use only
- #stop! Internal use only
- #to_s Internal use only
- #write_check_command Internal use only
::Mongo::BackgroundThread
- Included
#run! | Start the background thread. |
#stop! | Stop the background thread and wait for to terminate for a reasonable amount of time. |
#do_work | Override this method to do the work in the background thread. |
#pre_stop | Override this method to perform additional signaling for the background thread to stop. |
#start!, | |
#wait_for_stop | Waits for the thread to die, with a timeout. |
::Mongo::Loggable
- Included
#log_debug | Convenience method to log debug messages with the standard prefix. |
#log_error | Convenience method to log error messages with the standard prefix. |
#log_fatal | Convenience method to log fatal messages with the standard prefix. |
#log_info | Convenience method to log info messages with the standard prefix. |
#log_warn | Convenience method to log warn messages with the standard prefix. |
#logger | Get the logger instance. |
#_mongo_log_prefix, #format_message |
Instance Attribute Details
#monitor ⇒ Server (readonly)
# File 'lib/mongo/server/push_monitor.rb', line 55
attr_reader :monitor
#monitoring ⇒ Monitoring (readonly)
# File 'lib/mongo/server/push_monitor.rb', line 61
attr_reader :monitoring
#options ⇒ Hash
(readonly)
# File 'lib/mongo/server/push_monitor.rb', line 64
attr_reader :
#topology_version ⇒ TopologyVersion (readonly)
# File 'lib/mongo/server/push_monitor.rb', line 58
attr_reader :topology_version
Instance Method Details
#check
# File 'lib/mongo/server/push_monitor.rb', line 137
def check @lock.synchronize do if @connection && @connection.pid != Process.pid log_warn("Detected PID change - Mongo client should have been reconnected (old pid #{@connection.pid}, new pid #{Process.pid}") @connection.disconnect! @connection = nil end end @lock.synchronize do unless @connection @server_pushing = false connection = PushMonitor::Connection.new(server.address, ) connection.connect! @connection = connection end end resp_msg = begin unless @server_pushing write_check_command end read_response rescue Mongo::Error @lock.synchronize do @connection.disconnect! @connection = nil end raise end @server_pushing = resp_msg.flags.include?(:more_to_come) result = Operation::Result.new(resp_msg) result.validate! result.documents.first end
#do_work
# File 'lib/mongo/server/push_monitor.rb', line 94
def do_work @lock.synchronize do return if @stop_requested end result = monitoring.publish_heartbeat(server, awaited: true) do check end new_description = monitor.run_sdam_flow(result, awaited: true) # When hello fails due to a fail point, the response does not # include topology version. In this case we need to keep our existing # topology version so that we can resume monitoring. # The spec does not appear to directly address this case but # https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-monitoring.md#streamable-hello-or-legacy-hello-command # says that topologyVersion should only be updated from successful # hello responses. if new_description.topology_version @topology_version = new_description.topology_version end rescue IOError, SocketError, SystemCallError, Mongo::Error => exc stop_requested = @lock.synchronize { @stop_requested } if stop_requested # Ignore the exception, see RUBY-2771. return end msg = "Error running awaited hello on #{server.address}" Utils.warn_bg_exception(msg, exc, logger: [:logger], log_prefix: [:log_prefix], bg_error_backtrace: [:bg_error_backtrace], ) # If a request failed on a connection, stop push monitoring. # In case the server is dead we don't want to have two connections # trying to connect unsuccessfully at the same time. stop! # Request an immediate check on the monitor to get reinstated as # soon as possible in case the server is actually alive. server.scan_semaphore.signal end
#read_response
# File 'lib/mongo/server/push_monitor.rb', line 184
def read_response if timeout = [:connect_timeout] if timeout < 0 raise Mongo::SocketTimeoutError, "Requested to read with a negative timeout: #{}" elsif timeout > 0 timeout += [:heartbeat_frequency] || Monitor::DEFAULT_HEARTBEAT_INTERVAL end end # We set the timeout twice: once passed into read_socket which applies # to each individual read operation, and again around the entire read. Timeout.timeout(timeout, Error::SocketTimeoutError, "Failed to read an awaited hello response in #{timeout} seconds") do @lock.synchronize { @connection }.read_response(socket_timeout: timeout) end end
#start!
# File 'lib/mongo/server/push_monitor.rb', line 69
def start! @lock.synchronize do super end end
#stop!
# File 'lib/mongo/server/push_monitor.rb', line 75
def stop! @lock.synchronize do @stop_requested = true if @connection # Interrupt any in-progress exhausted hello reads by # disconnecting the connection. @connection.send(:socket).close rescue nil end end super.tap do @lock.synchronize do if @connection @connection.disconnect! @connection = nil end end end end
#to_s
# File 'lib/mongo/server/push_monitor.rb', line 199
def to_s "#<#{self.class.name}:#{object_id} #{server.address}>" end
#write_check_command
# File 'lib/mongo/server/push_monitor.rb', line 173
def write_check_command document = @check_document.merge( topologyVersion: topology_version.to_doc, maxAwaitTimeMS: monitor.heartbeat_interval * 1000, ) command = Protocol::Msg.new( [:exhaust_allowed], {}, document.merge({'$db' => Database::ADMIN}) ) @lock.synchronize { @connection }.write_bytes(command.serialize.to_s) end