123456789_123456789_123456789_123456789_123456789_

Module: ActionCable::Channel::Streams

Relationships & Source Files
Extension / Inclusion / Inheritance Descendants
Included In:
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
Defined in: actioncable/lib/action_cable/channel/streams.rb

Overview

Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pubsub queue where any data placed into it is automatically sent to the clients that are connected at that time. It’s purely an online queue, though. If you’re not streaming a broadcasting at the very moment it sends out an update, you will not get that update, even if you connect after it has been sent.

Most commonly, the streamed broadcast is sent straight to the subscriber on the client-side. The channel just acts as a connector between the two parties (the broadcaster and the channel subscriber). Here’s an example of a channel that allows subscribers to get all new comments on a given page:

class CommentsChannel < ApplicationCable::Channel
  def follow(data)
    stream_from "comments_for_#{data['recording_id']}"
  end

  def unfollow
    stop_all_streams
  end
end

Based on the above example, the subscribers of this channel will get whatever data is put into the, let’s say, comments_for_45 broadcasting as soon as it’s put there.

An example broadcasting for this channel looks like so:

ActionCable.server.broadcast "comments_for_45", { author: 'DHH', content: 'Rails is just swell' }

If you have a stream that is related to a model, then the broadcasting used can be generated from the model and channel. The following example would subscribe to a broadcasting like comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE.

class CommentsChannel < ApplicationCable::Channel
  def subscribed
    post = Post.find(params[:id])
    stream_for post
  end
end

You can then broadcast to this channel using:

CommentsChannel.broadcast_to(@post, @comment)

If you don’t just want to parlay the broadcast unfiltered to the subscriber, you can also supply a callback that lets you alter what is sent out. The below example shows how you can use this to provide performance introspection in the process:

class ChatChannel < ApplicationCable::Channel
  def subscribed
    @room = Chat::Room[params[:room_number]]

    stream_for @room, coder: ActiveSupport::JSON do |message|
      if message['originated_at'].present?
        elapsed_time = (Time.now.to_f - message['originated_at']).round(2)

        ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing
        logger.info "Message took #{elapsed_time}s to arrive"
      end

      transmit message
    end
  end
end

You can stop streaming from all broadcasts by calling #stop_all_streams.

Class Method Summary

::ActiveSupport::Concern - Extended

class_methods

Define class methods from given block.

included

Evaluate given block in context of base class, so that you can write class macros here.

prepended

Evaluate given block in context of base class, so that you can write class macros here.

Instance Attribute Summary

Instance Method Summary

DSL Calls

included

[ GitHub ]


68
69
70
# File 'actioncable/lib/action_cable/channel/streams.rb', line 68

included do
  on_unsubscribe :stop_all_streams
end

Instance Attribute Details

#pubsub (readonly)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/channel/streams.rb', line 140

delegate :pubsub, to: :connection

Instance Method Details

#stop_all_streams

Unsubscribes all streams associated with this channel from the pubsub queue.

[ GitHub ]

  
# File 'actioncable/lib/action_cable/channel/streams.rb', line 120

def stop_all_streams
  streams.each do |broadcasting, callback|
    pubsub.unsubscribe broadcasting, callback
    logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
  end.clear
end

#stop_stream_for(model)

Unsubscribes streams for the model.

[ GitHub ]

  
# File 'actioncable/lib/action_cable/channel/streams.rb', line 115

def stop_stream_for(model)
  stop_stream_from(broadcasting_for(model))
end

#stop_stream_from(broadcasting)

Unsubscribes streams from the named broadcasting.

[ GitHub ]

  
# File 'actioncable/lib/action_cable/channel/streams.rb', line 106

def stop_stream_from(broadcasting)
  callback = streams.delete(broadcasting)
  if callback
    pubsub.unsubscribe(broadcasting, callback)
    logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
  end
end

#stream_for(model, callback = nil, coder: nil, &block)

Start streaming the pubsub queue for the model in this channel. Optionally, you can pass a callback that’ll be used instead of the default of just transmitting the updates straight to the subscriber.

Pass coder: ActiveSupport::JSON to decode messages as JSON before passing to the callback. Defaults to coder: nil which does no decoding, passes raw messages.

[ GitHub ]

  
# File 'actioncable/lib/action_cable/channel/streams.rb', line 101

def stream_for(model, callback = nil, coder: nil, &block)
  stream_from(broadcasting_for(model), callback || block, coder: coder)
end

#stream_from(broadcasting, callback = nil, coder: nil, &block)

Start streaming from the named broadcasting pubsub queue. Optionally, you can pass a callback that’ll be used instead of the default of just transmitting the updates straight to the subscriber. Pass coder: ActiveSupport::JSON to decode messages as JSON before passing to the callback. Defaults to coder: nil which does no decoding, passes raw messages.

[ GitHub ]

  
# File 'actioncable/lib/action_cable/channel/streams.rb', line 76

def stream_from(broadcasting, callback = nil, coder: nil, &block)
  broadcasting = String(broadcasting)

  # Don't send the confirmation until pubsub#subscribe is successful
  defer_subscription_confirmation!

  # Build a stream handler by wrapping the user-provided callback with
  # a decoder or defaulting to a JSON-decoding retransmitter.
  handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder)
  streams[broadcasting] = handler

  connection.server.event_loop.post do
    pubsub.subscribe(broadcasting, handler, lambda do
      ensure_confirmation_sent
      logger.info "#{self.class.name} is streaming from #{broadcasting}"
    end)
  end
end

#stream_or_reject_for(record)

Calls stream_for if record is present, otherwise calls reject. This method is intended to be called when you’re looking for a record based on a parameter, if its found it will start streaming. If the record is nil then it will reject the connection.

[ GitHub ]

  
# File 'actioncable/lib/action_cable/channel/streams.rb', line 131

def stream_or_reject_for(record)
  if record
    stream_for record
  else
    reject
  end
end