123456789_123456789_123456789_123456789_123456789_

Class: ActionCable::Connection::Base

Relationships & Source Files
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
Instance Chain:
Inherits: Object
Defined in: actioncable/lib/action_cable/connection/base.rb

Overview

For every WebSocket connection the Action Cable server accepts, a ::ActionCable::Connection object will be instantiated. This instance becomes the parent of all of the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions based on an identifier sent by the Action Cable consumer. The Connection itself does not deal with any specific application logic beyond authentication and authorization.

Here’s a basic example:

module ApplicationCable
  class Connection < ActionCable::Connection::Base
    identified_by :current_user

    def connect
      self.current_user = find_verified_user
      logger.add_tags current_user.name
    end

    def disconnect
      # Any cleanup work needed when the cable connection is cut.
    end

    private
      def find_verified_user
        User.find_by_identity(cookies.encrypted[:identity_id]) ||
          reject_unauthorized_connection
      end
  end
end

First, we declare that this connection can be identified by its current_user. This allows us to later be able to find all connections established for that current_user (and potentially disconnect them). You can declare as many identification indexes as you like. Declaring an identification means that an attr_accessor is automatically set for that key.

Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection.

Finally, we add a tag to the connection-specific logger with the name of the current user to easily distinguish their messages in the log.

Pretty simple, eh?

Constant Summary

::ActiveSupport::Callbacks - Included

CALLBACK_FILTER_TYPES

Identification - Attributes & Methods

::ActiveSupport::Rescuable - Attributes & Methods

Class Method Summary

::ActiveSupport::DescendantsTracker - self

descendants

See additional method definition at line 104.

subclasses

See additional method definition at line 100.

clear, disable_clear!, reject!,
store_inherited

This is the only method that is not thread safe, but is only ever called during the eager loading phase.

Instance Attribute Summary

Callbacks - Included

Instance Method Summary

::ActiveSupport::Rescuable - Included

#rescue_with_handler

Delegates to the class method, but uses the instance as the subject for rescue_from handlers (method calls, instance_exec blocks).

#handler_for_rescue

Internal handler lookup.

::ActiveSupport::Callbacks - Included

#run_callbacks

Runs the callbacks for the given event.

#halted_callback_hook

A hook invoked every time a before callback is halted.

Authorization - Included

#reject_unauthorized_connection

Closes the WebSocket connection if it is open and returns an “unauthorized” reason.

InternalChannel - Included

Identification - Included

#connection_identifier

Return a single connection identifier that combines the value of all the registered identifiers into a single gid.

#connection_gid

Constructor Details

.new(server, env, coder: ActiveSupport::JSON) ⇒ Base

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 58

def initialize(server, env, coder: ActiveSupport::JSON)
  @server, @env, @coder = server, env, coder

  @worker_pool = server.worker_pool
  @logger = new_tagged_logger

  @websocket      = ActionCable::Connection::WebSocket.new(env, self, event_loop)
  @subscriptions  = ActionCable::Connection::Subscriptions.new(self)
  @message_buffer = ActionCable::Connection::MessageBuffer.new(self)

  @_internal_subscriptions = nil
  @started_at = Time.now
end

Class Attribute Details

.identifiers (rw)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/identification.rb', line 11

class_attribute :identifiers, default: Set.new

.identifiers?Boolean (rw)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/identification.rb', line 11

class_attribute :identifiers, default: Set.new

.rescue_handlers (rw)

[ GitHub ]

  
# File 'activesupport/lib/active_support/rescuable.rb', line 15

class_attribute :rescue_handlers, default: []

.rescue_handlers?Boolean (rw)

[ GitHub ]

  
# File 'activesupport/lib/active_support/rescuable.rb', line 15

class_attribute :rescue_handlers, default: []

Instance Attribute Details

#allow_request_origin?Boolean (readonly, private)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 214

def allow_request_origin?
  return true if server.config.disable_request_forgery_protection

  proto = Rack::Request.new(env).ssl? ? "https" : "http"
  if server.config.allow_same_origin_as_host && env["HTTP_ORIGIN"] == "#{proto}://#{env['HTTP_HOST']}"
    true
  elsif Array(server.config.allowed_request_origins).any? { |allowed_origin|  allowed_origin === env["HTTP_ORIGIN"] }
    true
  else
    logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}")
    false
  end
end

#config (readonly)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 56

delegate :event_loop, :pubsub, :config, to: :server

#env (readonly)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 55

attr_reader :server, :env, :subscriptions, :logger, :worker_pool, :protocol

#event_loop (readonly)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 56

delegate :event_loop, :pubsub, :config, to: :server

#identifiers (rw)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/identification.rb', line 11

class_attribute :identifiers, default: Set.new

#identifiers?Boolean (rw)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/identification.rb', line 11

class_attribute :identifiers, default: Set.new

#logger (readonly)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 55

attr_reader :server, :env, :subscriptions, :logger, :worker_pool, :protocol

#message_buffer (readonly, private)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 161

attr_reader :message_buffer

#protocol (readonly)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 55

attr_reader :server, :env, :subscriptions, :logger, :worker_pool, :protocol

#pubsub (readonly)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 56

delegate :event_loop, :pubsub, :config, to: :server

#rescue_handlers (rw)

[ GitHub ]

  
# File 'activesupport/lib/active_support/rescuable.rb', line 15

class_attribute :rescue_handlers, default: []

#rescue_handlers?Boolean (rw)

[ GitHub ]

  
# File 'activesupport/lib/active_support/rescuable.rb', line 15

class_attribute :rescue_handlers, default: []

#server (readonly)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 55

attr_reader :server, :env, :subscriptions, :logger, :worker_pool, :protocol

#subscriptions (readonly)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 55

attr_reader :server, :env, :subscriptions, :logger, :worker_pool, :protocol

#websocket (readonly, private)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 160

attr_reader :websocket

#worker_pool (readonly)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 55

attr_reader :server, :env, :subscriptions, :logger, :worker_pool, :protocol

Instance Method Details

#beat

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 134

def beat
  transmit type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i
end

#close(reason: nil, reconnect: true)

Close the WebSocket connection.

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 109

def close(reason: nil, reconnect: true)
  transmit(
    type: ActionCable::INTERNAL[:message_types][:disconnect],
    reason: reason,
    reconnect: reconnect
  )
  websocket.close
end

#cookies (private)

The cookies of the request that initiated the WebSocket connection. Useful for performing authorization checks.

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 172

def cookies # :doc:
  request.cookie_jar
end

#decode(websocket_message) (private)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 180

def decode(websocket_message)
  @coder.decode websocket_message
end

#dispatch_websocket_message(websocket_message)

This method is for internal use only.
[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 90

def dispatch_websocket_message(websocket_message) # :nodoc:
  if websocket.alive?
    handle_channel_command decode(websocket_message)
  else
    logger.error "Ignoring message processed after the WebSocket was closed: #{websocket_message.inspect})"
  end
end

#encode(cable_message) (private)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 176

def encode(cable_message)
  @coder.encode cable_message
end

#finished_request_message (private)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 256

def finished_request_message
  'Finished "%s"%s for %s at %s' % [
    request.filtered_path,
    websocket.possible? ? " [WebSocket]" : "[non-WebSocket]",
    request.ip,
    Time.now.to_s ]
end

#handle_channel_command(payload)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 98

def handle_channel_command(payload)
  run_callbacks :command do
    subscriptions.execute_command payload
  end
end

#handle_close (private)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 196

def handle_close
  logger.info finished_request_message

  server.remove_connection(self)

  subscriptions.unsubscribe_from_all
  unsubscribe_from_internal_channel

  disconnect if respond_to?(:disconnect)
end

#handle_open (private)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 184

def handle_open
  @protocol = websocket.protocol
  connect if respond_to?(:connect)
  subscribe_to_internal_channel
  send_welcome_message

  message_buffer.process!
  server.add_connection(self)
rescue ActionCable::Connection::Authorization::UnauthorizedError
  close(reason: ActionCable::INTERNAL[:disconnect_reasons][:unauthorized], reconnect: false) if websocket.alive?
end

#inspect

This method is for internal use only.
[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 155

def inspect # :nodoc:
  "#<#{self.class.name}:#{'%#016x' % (object_id << 1)}>"
end

#invalid_request_message (private)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 264

def invalid_request_message
  "Failed to upgrade to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)" % [
    env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"]
  ]
end

#new_tagged_logger (private)

Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags.

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 242

def new_tagged_logger
  TaggedLoggerProxy.new server.logger,
    tags: server.config.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize }
end

#on_close(reason, code)

This method is for internal use only.
[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 151

def on_close(reason, code) # :nodoc:
  send_async :handle_close
end

#on_error(message)

This method is for internal use only.
[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 146

def on_error(message) # :nodoc:
  # log errors to make diagnosing socket errors easier
  logger.error "WebSocket error occurred: #{message}"
end

#on_message(message)

This method is for internal use only.
[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 142

def on_message(message) # :nodoc:
  message_buffer.append message
end

#on_open

This method is for internal use only.
[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 138

def on_open # :nodoc:
  send_async :handle_open
end

#process

This method is for internal use only.

Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user. This method should not be called directly – instead rely upon on the #connect (and #disconnect) callbacks.

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 74

def process # :nodoc:
  logger.info started_request_message

  if websocket.possible? && allow_request_origin?
    respond_to_successful_request
  else
    respond_to_invalid_request
  end
end

#receive(websocket_message)

This method is for internal use only.

Decodes WebSocket messages and dispatches them to subscribed channels. WebSocket message transfer encoding is always JSON.

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 86

def receive(websocket_message) # :nodoc:
  send_async :dispatch_websocket_message, websocket_message
end

#request (private)

The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 164

def request # :doc:
  @request ||= begin
    environment = Rails.application.env_config.merge(env) if defined?(Rails.application) && Rails.application
    ActionDispatch::Request.new(environment || env)
  end
end

#respond_to_invalid_request (private)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 233

def respond_to_invalid_request
  close(reason: ActionCable::INTERNAL[:disconnect_reasons][:invalid_request]) if websocket.alive?

  logger.error invalid_request_message
  logger.info finished_request_message
  [ 404, { Rack::CONTENT_TYPE => "text/plain; charset=utf-8" }, [ "Page not found" ] ]
end

#respond_to_successful_request (private)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 228

def respond_to_successful_request
  logger.info successful_request_message
  websocket.rack_response
end

#send_async(method, *arguments)

Invoke a method on the connection asynchronously through the pool of thread workers.

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 119

def send_async(method, *arguments)
  worker_pool.async_invoke(self, method, *arguments)
end

#send_welcome_message (private)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 207

def send_welcome_message
  # Send welcome message to the internal connection monitor channel.
  # This ensures the connection monitor state is reset after a successful
  # websocket connection.
  transmit type: ActionCable::INTERNAL[:message_types][:welcome]
end

#started_request_message (private)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 247

def started_request_message
  'Started %s "%s"%s for %s at %s' % [
    request.request_method,
    request.filtered_path,
    websocket.possible? ? " [WebSocket]" : "[non-WebSocket]",
    request.ip,
    Time.now.to_s ]
end

#statistics

Return a basic hash of statistics for the connection keyed with identifier, started_at, #subscriptions, and request_id. This can be returned by a health check against the connection.

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 125

def statistics
  {
    identifier: connection_identifier,
    started_at: @started_at,
    subscriptions: subscriptions.identifiers,
    request_id: @env["action_dispatch.request_id"]
  }
end

#successful_request_message (private)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 270

def successful_request_message
  "Successfully upgraded to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)" % [
    env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"]
  ]
end

#transmit(cable_message)

This method is for internal use only.
[ GitHub ]

  
# File 'actioncable/lib/action_cable/connection/base.rb', line 104

def transmit(cable_message) # :nodoc:
  websocket.transmit encode(cable_message)
end