123456789_123456789_123456789_123456789_123456789_

Class: EventMachine::Protocols::Postgres3

Relationships & Source Files
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
self, Connection
Instance Chain:
self, PostgresPR, Connection
Inherits: EventMachine::Connection
Defined in: lib/em/protocols/postgres3.rb

Overview

PROVISIONAL IMPLEMENTATION of an evented Postgres client. This implements version 3 of the Postgres wire protocol, which will work with any Postgres version from roughly 7.4 onward.

Objective: we want to access Postgres databases without requiring threads. Until now this has been a problem because the Postgres client implementations have all made use of blocking I/O calls, which is incompatible with a thread-free evented model.

But rather than re-implement the Postgres Wire3 protocol, we're taking advantage of the existing postgres-pr library, which was originally written by Michael Neumann but (at this writing) appears to be no longer maintained. Still, it's in basically a production-ready state, and the wire protocol isn't that complicated anyway.

We're tucking in a bunch of require statements that may not be present in garden-variety EM installations. Until we find a good way to only require these if a program requires postgres, this file will need to be required explicitly.

We need to monkeypatch ::StringIO because it lacks the #readbytes method needed by postgres-pr. The StringIO monkeypatch is lifted from the standard library readbytes.rb, which adds method #readbytes directly to class IO. But StringIO is not a subclass of IO. It is modified to raise an IOError instead of TruncatedDataException since the exception is unused.

We cloned the handling of postgres messages from lib/postgres-pr/connection.rb in the postgres-pr library, and modified it for event-handling.

TODO: The password handling in dispatch_conn_message is totally incomplete.

We return Deferrables from the user-level operations surfaced by this interface. Experimentally, we're using the pattern of always returning a boolean value as the first argument of a deferrable callback to indicate success or failure. This is instead of the traditional pattern of calling Deferrable#succeed or #fail, and requiring the user to define both a callback and an errback function.

Usage

EM.run { db = EM.connect_unix_domain( "/tmp/.s.PGSQL.5432", EM::P::Postgres3 ) db.connect( dbname, username, psw ).callback do |status| if status db.query( "select * from some_table" ).callback do |status, result, errors| if status result.rows.each do |row| p row end end end end end }

Class Method Summary

Connection - Inherited

.new

Override .new so subclasses don't have to call super and can ignore connection-specific arguments.

Instance Attribute Summary

Connection - Inherited

#comm_inactivity_timeout

comm_inactivity_timeout returns the current value (float in seconds) of the inactivity-timeout property of network-connection and datagram-socket objects.

#comm_inactivity_timeout=

Allows you to set the inactivity-timeout property for a network connection or datagram socket.

#error?

Returns true if the connection is in an error state, false otherwise.

#notify_readable=

Watches connection for readability.

#notify_readable?,
#notify_writable=

Watches connection for writeability.

#notify_writable?

Returns true if the connection is being watched for writability.

#paused?,
#pending_connect_timeout

The duration after which a TCP connection in the connecting state will fail.

#pending_connect_timeout=

Sets the duration after which a TCP connection in a connecting state will fail.

#signature, #watch_only?

Instance Method Summary

Connection - Inherited

#associate_callback_target

conn_associate_callback_target.

#close_connection

EventMachine::Connection#close_connection is called only by user code, and never by the event loop.

#close_connection_after_writing
#connection_completed

Called by the event loop when a remote TCP connection attempt completes successfully.

#detach

Removes given connection from the event loop.

#disable_keepalive

t_disable_keepalive.

#enable_keepalive

t_enable_keepalive.

#get_cipher_bits, #get_cipher_name, #get_cipher_protocol,
#get_idle_time

The number of seconds since the last send/receive activity on this connection.

#get_outbound_data_size

conn_get_outbound_data_size.

#get_peer_cert

If TLS is active on the connection, returns the remote X509 certificate as a string, in the popular PEM format.

#get_peername

This method is used with stream-connections to obtain the identity of the remotely-connected peer.

#get_pid

Returns the PID (kernel process identifier) of a subprocess associated with this Connection object.

#get_proxied_bytes

The number of bytes proxied to another connection.

#get_sni_hostname, #get_sock_opt,
#get_sockname

Used with stream-connections to obtain the identity of the local side of the connection.

#get_status

Returns a subprocess exit status.

#initialize

Stubbed initialize so legacy superclasses can safely call super.

#original_method,
#pause

Pause a connection so that EventMachine#send_data and #receive_data events are not fired until #resume is called.

#post_init

Called by the event loop immediately after the network connection has been established, and before resumption of the network loop.

#proxy_completed

called when the reactor finished proxying all of the requested bytes.

#proxy_incoming_to

EventMachine::Connection#proxy_incoming_to is called only by user code.

#proxy_target_unbound

Called by the reactor after attempting to relay incoming data to a descriptor (set as a proxy target descriptor with EventMachine.enable_proxy) that has already been closed.

#receive_data

Called by the event loop whenever data has been received by the network connection.

#reconnect

Reconnect to a given host/port with the current instance.

#resume

Resume a connection's EventMachine#send_data and #receive_data events.

#send_data

Call this method to send data to the remote end of the network connection.

#send_datagram

Sends UDP messages.

#send_file_data

Like Connection#send_data, this sends data to the remote end of the network connection.

#set_sock_opt,
#ssl_handshake_completed

Called by ::EventMachine when the SSL/TLS handshake has been completed, as a result of calling #start_tls to initiate SSL/TLS on the connection.

#ssl_verify_peer

Called by ::EventMachine when :verify_peer => true has been passed to EventMachine#start_tls.

#start_tls

Call EventMachine#start_tls at any point to initiate TLS encryption on connected streams.

#stop_proxying

A helper method for EventMachine.disable_proxy

#stream_file_data

Open a file on the filesystem and send it to the remote peer.

#unbind

called by the framework whenever a connection (either a server or client connection) is closed.

Constructor Details

.newPostgres3

[ GitHub ]

  
# File 'lib/em/protocols/postgres3.rb', line 109

def initialize
  @data = ""
  @params = {}
end

Instance Method Details

#connect(db, user, psw = nil)

[ GitHub ]

  
# File 'lib/em/protocols/postgres3.rb', line 114

def connect db, user, psw=nil
  d = EM::DefaultDeferrable.new
  d.timeout 15

  if @pending_query || @pending_conn
    d.succeed false, "Operation already in progress"
  else
    @pending_conn = d
    prms = {"user"=>user, "database"=>db}
    @user = user
    if psw
      @password = psw
      #prms["password"] = psw
    end
    send_data PostgresPR::StartupMessage.new( 3 << 16, prms ).dump
  end

  d
end

#dispatch_conn_message(msg)

Cloned and modified from the postgres-pr.

[ GitHub ]

  
# File 'lib/em/protocols/postgres3.rb', line 179

def dispatch_conn_message msg
  case msg
  when AuthentificationClearTextPassword
    raise ArgumentError, "no password specified" if @password.nil?
    send_data PasswordMessage.new(@password).dump

  when AuthentificationCryptPassword
    raise ArgumentError, "no password specified" if @password.nil?
    send_data PasswordMessage.new(@password.crypt(msg.salt)).dump

  when AuthentificationMD5Password
    raise ArgumentError, "no password specified" if @password.nil?
    require 'digest/md5'

    m = Digest::MD5.hexdigest(@password + @user)
    m = Digest::MD5.hexdigest(m + msg.salt)
    m = 'md5' + m
    send_data PasswordMessage.new(m).dump

  when AuthentificationKerberosV4, AuthentificationKerberosV5, AuthentificationSCMCredential
    raise "unsupported authentification"

  when AuthentificationOk
  when ErrorResponse
    raise msg.field_values.join("\t")
  when NoticeResponse
    @notice_processor.call(msg) if @notice_processor
  when ParameterStatus
    @params[msg.key] = msg.value
  when BackendKeyData
    # TODO
    #p msg
  when ReadyForQuery
    # TODO: use transaction status
    pc,@pending_conn = @pending_conn,nil
    pc.succeed true
  else
    raise "unhandled message type"
  end
end

#dispatch_query_message(msg)

Cloned and modified from the postgres-pr.

[ GitHub ]

  
# File 'lib/em/protocols/postgres3.rb', line 221

def dispatch_query_message msg
  case msg
  when DataRow
    @r.rows << msg.columns
  when CommandComplete
    @r.cmd_tag = msg.cmd_tag
  when ReadyForQuery
    pq,@pending_query = @pending_query,nil
    pq.succeed true, @r, @e
  when RowDescription
    @r.fields = msg.fields
  when CopyInResponse
  when CopyOutResponse
  when EmptyQueryResponse
  when ErrorResponse
    # TODO
    @e << msg
  when NoticeResponse
    @notice_processor.call(msg) if @notice_processor
  else
    # TODO
  end
end

#query(sql)

[ GitHub ]

  
# File 'lib/em/protocols/postgres3.rb', line 134

def query sql
  d = EM::DefaultDeferrable.new
  d.timeout 15

  if @pending_query || @pending_conn
    d.succeed false, "Operation already in progress"
  else
    @r = PostgresPR::Connection::Result.new
    @e = []
    @pending_query = d
    send_data PostgresPR::Query.dump(sql)
  end

  d
end

#receive_data(data)

[ GitHub ]

  
# File 'lib/em/protocols/postgres3.rb', line 151

def receive_data data
  @data << data
  while @data.length >= 5
    pktlen = @data[1...5].unpack("N").first
    if @data.length >= (1 + pktlen)
      pkt = @data.slice!(0...(1+pktlen))
      m = StringIO.open( pkt, "r" ) {|io| PostgresPR::Message.read( io ) }
      if @pending_conn
        dispatch_conn_message m
      elsif @pending_query
        dispatch_query_message m
      else
        raise "Unexpected message from database"
      end
    else
      break # very important, break out of the while
    end
  end
end

#unbind

[ GitHub ]

  
# File 'lib/em/protocols/postgres3.rb', line 172

def unbind
  if o = (@pending_query || @pending_conn)
    o.succeed false, "lost connection"
  end
end