123456789_123456789_123456789_123456789_123456789_

Module: PG::Connection::Pollable

Relationships & Source Files
Extension / Inclusion / Inheritance Descendants
Included In:
Defined in: lib/pg/connection.rb

Instance Method Summary

  • #polling_loop(poll_meth) private

    Track the progress of the connection, waiting for the socket to become readable/writable before polling it.

  • #remove_current_host(iopts) private

    Remove the host to which the connection is currently established from the option hash.

Instance Method Details

#polling_loop(poll_meth) (private)

Track the progress of the connection, waiting for the socket to become readable/writable before polling it.

Connecting to multiple hosts is done like so:

  • All hosts are passed to PG::Connection.connect_start

  • As soon as the host is tried to connect the related host is removed from the hosts list

  • When the polling status changes to PG::PGRES_POLLING_OK the connection is returned and ready to use.

  • When the polling status changes to PG::PGRES_POLLING_FAILED connecting is aborted and a ::PG::ConnectionBad is raised with details to all connection attepts.

  • When a timeout occurs, connecting is restarted with the remaining hosts.

The downside is that this connects only once to hosts which are listed twice when they timeout.

[ GitHub ]

  
# File 'lib/pg/connection.rb', line 686

private def polling_loop(poll_meth)
	connect_timeout = conninfo_hash[:connect_timeout]
	if (timeo = connect_timeout.to_i) && timeo > 0
		host_count = (conninfo_hash[:hostaddr].to_s.empty? ? conninfo_hash[:host] : conninfo_hash[:hostaddr]).to_s.count(",") + 1
		stop_time = timeo * host_count + Process.clock_gettime(Process::CLOCK_MONOTONIC)
	end
	iopts = conninfo_hash.compact
	connection_errors = []
	poll_status = PG::PGRES_POLLING_WRITING

	until poll_status == PG::PGRES_POLLING_OK ||
			poll_status == PG::PGRES_POLLING_FAILED

		# Set single timeout to parameter "connect_timeout" but
		# don't exceed total connection time of number-of-hosts * connect_timeout.
		timeout = [timeo, stop_time - Process.clock_gettime(Process::CLOCK_MONOTONIC)].min if stop_time

		hostcnt = remove_current_host(iopts)

		event = if !timeout || timeout >= 0
			# If the socket needs to read, wait 'til it becomes readable to poll again
			case poll_status
			when PG::PGRES_POLLING_READING
				if defined?(IO::READABLE) # ruby-3.0+
					socket_io.wait(IO::READABLE | IO::PRIORITY, timeout)
				else
					IO.select([socket_io], nil, [socket_io], timeout)
				end

			# ...and the same for when the socket needs to write
			when PG::PGRES_POLLING_WRITING
				if defined?(IO::WRITABLE) # ruby-3.0+
					# Use wait instead of wait_readable, since connection errors are delivered as
					# exceptional/priority events on Windows.
					socket_io.wait(IO::WRITABLE | IO::PRIORITY, timeout)
				else
					# io#wait on ruby-2.x doesn't wait for priority, so fallback to IO.select
					IO.select(nil, [socket_io], [socket_io], timeout)
				end
			end
		end

		# connection to server at "localhost" (127.0.0.1), port 5433 failed: timeout expired (PG::ConnectionBad)
		# connection to server on socket "/var/run/postgresql/.s.PGSQL.5433" failed: No such file or directory
		unless event
			connection_errors << (error_message + "timeout expired")
			if hostcnt > 0
				reset_start2(self.class.parse_connect_args(iopts))
				# Restart polling with waiting for writable.
				# Otherwise "not connected" error is raised on Windows.
				poll_status = PG::PGRES_POLLING_WRITING
				next
			else
				finish
				raise PG::ConnectionBad.new(connection_errors.join("\n").b, connection: self)
			end
		end

		# Check to see if it's finished or failed yet
		poll_status = send( poll_meth )
	end

	unless status == PG::CONNECTION_OK
		msg = error_message
		finish
		raise PG::ConnectionBad.new(connection_errors.map{|e| e + "\n" }.join.b + msg, connection: self)
	end
end

#remove_current_host(iopts) (private)

Remove the host to which the connection is currently established from the option hash. Affected options are:

  • :host

  • :hostaddr

  • :port

Return the number of remaining hosts.

[ GitHub ]

  
# File 'lib/pg/connection.rb', line 762

private def remove_current_host(iopts)
	ihosts = iopts[:host]&.split(",", -1)
	ihostaddrs = iopts[:hostaddr]&.split(",", -1)
	iports = iopts[:port]&.split(",", -1)
	iports = iports * (ihosts || ihostaddrs || [1]).size if iports&.size == 1

	idx = (ihosts || ihostaddrs || iports).index.with_index do |_, i|
		(ihosts ? ihosts[i] == host : true) &&
		(ihostaddrs && respond_to?(:hostaddr, true) ? ihostaddrs[i] == hostaddr : true) &&
		(iports ? iports[i].to_i == port : true)
	end

	if idx
		ihosts&.delete_at(idx)
		ihostaddrs&.delete_at(idx)
		iports&.delete_at(idx)

		iopts.merge!(
			host: ihosts.join(",")) if ihosts
		iopts.merge!(
			hostaddr: ihostaddrs.join(",")) if ihostaddrs
		iopts.merge!(
			port: iports.join(",")) if iports
	end

	(ihosts || ihostaddrs || iports).size
end