Module: PG::Connection::Pollable
Relationships & Source Files | |
Extension / Inclusion / Inheritance Descendants | |
Included In:
| |
Defined in: | lib/pg/connection.rb |
Instance Method Summary
-
#polling_loop(poll_meth)
private
Track the progress of the connection, waiting for the socket to become readable/writable before polling it.
-
#remove_current_host(iopts)
private
Remove the host to which the connection is currently established from the option hash.
Instance Method Details
#polling_loop(poll_meth) (private)
Track the progress of the connection, waiting for the socket to become readable/writable before polling it.
Connecting to multiple hosts is done like so:
-
All hosts are passed to PG::Connection.connect_start
-
As soon as the host is tried to connect the related host is removed from the hosts list
-
When the polling status changes to
PG::PGRES_POLLING_OK
the connection is returned and ready to use. -
When the polling status changes to
PG::PGRES_POLLING_FAILED
connecting is aborted and a::PG::ConnectionBad
is raised with details to all connection attepts. -
When a timeout occurs, connecting is restarted with the remaining hosts.
The downside is that this connects only once to hosts which are listed twice when they timeout.
# File 'lib/pg/connection.rb', line 686
private def polling_loop(poll_meth) connect_timeout = conninfo_hash[:connect_timeout] if (timeo = connect_timeout.to_i) && timeo > 0 host_count = (conninfo_hash[:hostaddr].to_s.empty? ? conninfo_hash[:host] : conninfo_hash[:hostaddr]).to_s.count(",") + 1 stop_time = timeo * host_count + Process.clock_gettime(Process::CLOCK_MONOTONIC) end iopts = conninfo_hash.compact connection_errors = [] poll_status = PG::PGRES_POLLING_WRITING until poll_status == PG::PGRES_POLLING_OK || poll_status == PG::PGRES_POLLING_FAILED # Set single timeout to parameter "connect_timeout" but # don't exceed total connection time of number-of-hosts * connect_timeout. timeout = [timeo, stop_time - Process.clock_gettime(Process::CLOCK_MONOTONIC)].min if stop_time hostcnt = remove_current_host(iopts) event = if !timeout || timeout >= 0 # If the socket needs to read, wait 'til it becomes readable to poll again case poll_status when PG::PGRES_POLLING_READING if defined?(IO::READABLE) # ruby-3.0+ socket_io.wait(IO::READABLE | IO::PRIORITY, timeout) else IO.select([socket_io], nil, [socket_io], timeout) end # ...and the same for when the socket needs to write when PG::PGRES_POLLING_WRITING if defined?(IO::WRITABLE) # ruby-3.0+ # Use wait instead of wait_readable, since connection errors are delivered as # exceptional/priority events on Windows. socket_io.wait(IO::WRITABLE | IO::PRIORITY, timeout) else # io#wait on ruby-2.x doesn't wait for priority, so fallback to IO.select IO.select(nil, [socket_io], [socket_io], timeout) end end end # connection to server at "localhost" (127.0.0.1), port 5433 failed: timeout expired (PG::ConnectionBad) # connection to server on socket "/var/run/postgresql/.s.PGSQL.5433" failed: No such file or directory unless event connection_errors << ( + "timeout expired") if hostcnt > 0 reset_start2(self.class.parse_connect_args(iopts)) # Restart polling with waiting for writable. # Otherwise "not connected" error is raised on Windows. poll_status = PG::PGRES_POLLING_WRITING next else finish raise PG::ConnectionBad.new(connection_errors.join("\n").b, connection: self) end end # Check to see if it's finished or failed yet poll_status = send( poll_meth ) end unless status == PG::CONNECTION_OK msg = finish raise PG::ConnectionBad.new(connection_errors.map{|e| e + "\n" }.join.b + msg, connection: self) end end
#remove_current_host(iopts) (private)
Remove the host to which the connection is currently established from the option hash. Affected options are:
-
:host
-
:hostaddr
-
:port
Return the number of remaining hosts.
# File 'lib/pg/connection.rb', line 762
private def remove_current_host(iopts) ihosts = iopts[:host]&.split(",", -1) ihostaddrs = iopts[:hostaddr]&.split(",", -1) iports = iopts[:port]&.split(",", -1) iports = iports * (ihosts || ihostaddrs || [1]).size if iports&.size == 1 idx = (ihosts || ihostaddrs || iports).index.with_index do |_, i| (ihosts ? ihosts[i] == host : true) && (ihostaddrs && respond_to?(:hostaddr, true) ? ihostaddrs[i] == hostaddr : true) && (iports ? iports[i].to_i == port : true) end if idx ihosts&.delete_at(idx) ihostaddrs&.delete_at(idx) iports&.delete_at(idx) iopts.merge!( host: ihosts.join(",")) if ihosts iopts.merge!( hostaddr: ihostaddrs.join(",")) if ihostaddrs iopts.merge!( port: iports.join(",")) if iports end (ihosts || ihostaddrs || iports).size end