123456789_123456789_123456789_123456789_123456789_

Class: Gem::GemcutterUtilities::WebauthnListener

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Constructor Details

.new(host) ⇒ WebauthnListener

[ GitHub ]

  
# File 'lib/rubygems/gemcutter_utilities/webauthn_listener.rb', line 31

def initialize(host)
  @host = host
end

Class Method Details

.listener_thread(host, server)

[ GitHub ]

  
# File 'lib/rubygems/gemcutter_utilities/webauthn_listener.rb', line 35

def self.listener_thread(host, server)
  Thread.new do
    thread = Thread.current
    thread.abort_on_exception = true
    thread.report_on_exception = false
    thread[:otp] = new(host).wait_for_otp_code(server)
  rescue Gem::WebauthnVerificationError => e
    thread[:error] = e
  ensure
    server.close
  end
end

Instance Attribute Details

#host (readonly)

[ GitHub ]

  
# File 'lib/rubygems/gemcutter_utilities/webauthn_listener.rb', line 29

attr_reader :host

Instance Method Details

#parse_otp_from_uri(uri) (private)

[ GitHub ]

  
# File 'lib/rubygems/gemcutter_utilities/webauthn_listener.rb', line 87

def parse_otp_from_uri(uri)
  require "cgi"

  return if uri.query.nil?
  CGI.parse(uri.query).dig("code", 0)
end

#root_path?(uri) ⇒ Boolean (private)

[ GitHub ]

  
# File 'lib/rubygems/gemcutter_utilities/webauthn_listener.rb', line 83

def root_path?(uri)
  uri.path == "/"
end

#wait_for_otp_code(server)

[ GitHub ]

  
# File 'lib/rubygems/gemcutter_utilities/webauthn_listener.rb', line 48

def wait_for_otp_code(server)
  loop do
    socket = server.accept
    request_line = socket.gets

    method, req_uri, _protocol = request_line.split(" ")
    req_uri = Gem::URI.parse(req_uri)

    responder = SocketResponder.new(socket)

    unless root_path?(req_uri)
      responder.send(NotFoundResponse.for(host))
      raise Gem::WebauthnVerificationError, "Page at #{req_uri.path} not found."
    end

    case method.upcase
    when "OPTIONS"
      responder.send(NoContentResponse.for(host))
      next # will be GET
    when "GET"
      if otp = parse_otp_from_uri(req_uri)
        responder.send(OkResponse.for(host))
        return otp
      end
      responder.send(BadRequestResponse.for(host))
      raise Gem::WebauthnVerificationError, "Did not receive OTP from #{host}."
    else
      responder.send(MethodNotAllowedResponse.for(host))
      raise Gem::WebauthnVerificationError, "Invalid HTTP method #{method.upcase} received."
    end
  end
end