Module: ActionCable::Channel::Streams
Relationships & Source Files | |
Extension / Inclusion / Inheritance Descendants | |
Included In:
| |
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
self,
::ActiveSupport::Concern
|
|
Defined in: | actioncable/lib/action_cable/channel/streams.rb |
Overview
Streams
allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pubsub queue where any data placed into it is automatically sent to the clients that are connected at that time. It’s purely an online queue, though. If you’re not streaming a broadcasting at the very moment it sends out an update, you will not get that update, even if you connect after it has been sent.
Most commonly, the streamed broadcast is sent straight to the subscriber on the client-side. The channel just acts as a connector between the two parties (the broadcaster and the channel subscriber). Here’s an example of a channel that allows subscribers to get all new comments on a given page:
class CommentsChannel < ApplicationCable::Channel
def follow(data)
stream_from "comments_for_#{data['recording_id']}"
end
def unfollow
stop_all_streams
end
end
Based on the above example, the subscribers of this channel will get whatever data is put into the, let’s say, comments_for_45
broadcasting as soon as it’s put there.
An example broadcasting for this channel looks like so:
ActionCable.server.broadcast "comments_for_45", { author: 'DHH', content: 'Rails is just swell' }
If you have a stream that is related to a model, then the broadcasting used can be generated from the model and channel. The following example would subscribe to a broadcasting like comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE
.
class CommentsChannel < ApplicationCable::Channel
def subscribed
post = Post.find(params[:id])
stream_for post
end
end
You can then broadcast to this channel using:
CommentsChannel.broadcast_to(@post, @comment)
If you don’t just want to parlay the broadcast unfiltered to the subscriber, you can also supply a callback that lets you alter what is sent out. The below example shows how you can use this to provide performance introspection in the process:
class ChatChannel < ApplicationCable::Channel
def subscribed
@room = Chat::Room[params[:room_number]]
stream_for @room, coder: ActiveSupport::JSON do ||
if ['originated_at'].present?
elapsed_time = (Time.now.to_f - ['originated_at']).round(2)
ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing
logger.info "Message took #{elapsed_time}s to arrive"
end
transmit
end
end
end
You can stop streaming from all broadcasts by calling #stop_all_streams.
Class Method Summary
::ActiveSupport::Concern
- Extended
class_methods | Define class methods from given block. |
included | Evaluate given block in context of base class, so that you can write class macros here. |
prepended | Evaluate given block in context of base class, so that you can write class macros here. |
append_features, prepend_features |
Instance Attribute Summary
- #pubsub readonly
Instance Method Summary
-
#stop_all_streams
Unsubscribes all streams associated with this channel from the pubsub queue.
-
#stop_stream_for(model)
Unsubscribes streams for the
model
. -
#stop_stream_from(broadcasting)
Unsubscribes streams from the named
broadcasting
. -
#stream_for(model, callback = nil, coder: nil, &block)
Start streaming the pubsub queue for the
model
in this channel. -
#stream_from(broadcasting, callback = nil, coder: nil, &block)
Start streaming from the named
broadcasting
pubsub queue. -
#stream_or_reject_for(model)
Calls stream_for with the given
model
if it’s present to start streaming, otherwise rejects the subscription. -
#default_stream_handler(broadcasting, coder:)
private
May be overridden to change the default stream handling behavior which decodes JSON and transmits to the client.
- #identity_handler private
- #stream_decoder(handler = identity_handler, coder:) private
-
#stream_handler(broadcasting, user_handler, coder: nil)
private
May be overridden to add instrumentation, logging, specialized error handling, or other forms of handler decoration.
- #stream_transmitter(handler = identity_handler, broadcasting:) private
- #streams private
-
#worker_pool_stream_handler(broadcasting, user_handler, coder: nil)
private
Always wrap the outermost handler to invoke the user handler on the worker pool rather than blocking the event loop.
DSL Calls
included
[ GitHub ]80 81 82
# File 'actioncable/lib/action_cable/channel/streams.rb', line 80
included do on_unsubscribe :stop_all_streams end
Instance Attribute Details
#pubsub (readonly)
[ GitHub ]# File 'actioncable/lib/action_cable/channel/streams.rb', line 153
delegate :pubsub, to: :connection
Instance Method Details
#default_stream_handler(broadcasting, coder:) (private)
May be overridden to change the default stream handling behavior which decodes JSON and transmits to the client.
TODO: Tests demonstrating this.
TODO: Room for optimization. Update transmit API to be coder-aware so we can no-op when pubsub and connection are both JSON-encoded. Then we can skip decode+encode if we’re just proxying messages.
# File 'actioncable/lib/action_cable/channel/streams.rb', line 189
def default_stream_handler(broadcasting, coder:) coder ||= ActiveSupport::JSON stream_transmitter stream_decoder(coder: coder), broadcasting: broadcasting end
#identity_handler (private)
[ GitHub ]# File 'actioncable/lib/action_cable/channel/streams.rb', line 210
def identity_handler #=> message { message } end
#stop_all_streams
Unsubscribes all streams associated with this channel from the pubsub queue.
#stop_stream_for(model)
Unsubscribes streams for the model
.
# File 'actioncable/lib/action_cable/channel/streams.rb', line 130
def stop_stream_for(model) stop_stream_from(broadcasting_for(model)) end
#stop_stream_from(broadcasting)
Unsubscribes streams from the named broadcasting
.
#stream_decoder(handler = identity_handler, coder:) (private)
[ GitHub ]# File 'actioncable/lib/action_cable/channel/streams.rb', line 194
def stream_decoder(handler = identity_handler, coder:) if coder #=> message { handler.(coder.decode(message)) } else handler end end
#stream_for(model, callback = nil, coder: nil, &block)
Start streaming the pubsub queue for the model
in this channel. Optionally, you can pass a callback
that’ll be used instead of the default of just transmitting the updates straight to the subscriber.
Pass coder: ActiveSupport::JSON
to decode messages as JSON before passing to the callback. Defaults to coder: nil
which does no decoding, passes raw messages.
# File 'actioncable/lib/action_cable/channel/streams.rb', line 116
def stream_for(model, callback = nil, coder: nil, &block) stream_from(broadcasting_for(model), callback || block, coder: coder) end
#stream_from(broadcasting, callback = nil, coder: nil, &block)
Start streaming from the named broadcasting
pubsub queue. Optionally, you can pass a callback
that’ll be used instead of the default of just transmitting the updates straight to the subscriber. Pass ‘coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. Defaults to coder: nil
which does no decoding, passes raw messages.
# File 'actioncable/lib/action_cable/channel/streams.rb', line 90
def stream_from(broadcasting, callback = nil, coder: nil, &block) broadcasting = String(broadcasting) # Don't send the confirmation until pubsub#subscribe is successful defer_subscription_confirmation! # Build a stream handler by wrapping the user-provided callback with a decoder # or defaulting to a JSON-decoding retransmitter. handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder) streams[broadcasting] = handler connection.server.event_loop.post do pubsub.subscribe(broadcasting, handler, lambda do ensure_confirmation_sent logger.info "#{self.class.name} is streaming from #{broadcasting}" end) end end
#stream_handler(broadcasting, user_handler, coder: nil) (private)
May be overridden to add instrumentation, logging, specialized error handling, or other forms of handler decoration.
TODO: Tests demonstrating this.
# File 'actioncable/lib/action_cable/channel/streams.rb', line 173
def stream_handler(broadcasting, user_handler, coder: nil) if user_handler stream_decoder user_handler, coder: coder else default_stream_handler broadcasting, coder: coder end end
#stream_or_reject_for(model)
Calls stream_for with the given model
if it’s present to start streaming, otherwise rejects the subscription.
# File 'actioncable/lib/action_cable/channel/streams.rb', line 144
def stream_or_reject_for(model) if model stream_for model else reject end end
#stream_transmitter(handler = identity_handler, broadcasting:) (private)
[ GitHub ]# File 'actioncable/lib/action_cable/channel/streams.rb', line 202
def stream_transmitter(handler = identity_handler, broadcasting:) via = "streamed from #{broadcasting}" #=> (message) do transmit handler.( ), via: via end end
#streams (private)
[ GitHub ]# File 'actioncable/lib/action_cable/channel/streams.rb', line 155
def streams @_streams ||= {} end
#worker_pool_stream_handler(broadcasting, user_handler, coder: nil) (private)
Always wrap the outermost handler to invoke the user handler on the worker pool rather than blocking the event loop.
# File 'actioncable/lib/action_cable/channel/streams.rb', line 161
def worker_pool_stream_handler(broadcasting, user_handler, coder: nil) handler = stream_handler(broadcasting, user_handler, coder: coder) #=> message do connection.worker_pool.async_invoke handler, :call, , connection: connection end end