Class: ActionCable::SubscriptionAdapter::PostgreSQL::Listener
| Relationships & Source Files | |
| Super Chains via Extension / Inclusion / Inheritance | |
|
Class Chain:
|
|
|
Instance Chain:
|
|
| Inherits: |
ActionCable::SubscriptionAdapter::SubscriberMap::Async
|
| Defined in: | actioncable/lib/action_cable/subscription_adapter/postgresql.rb |
Class Method Summary
- .new(adapter, executor) ⇒ Listener constructor
::ActionCable::SubscriptionAdapter::SubscriberMap::Async - Inherited
::ActionCable::SubscriptionAdapter::SubscriberMap - Inherited
Instance Method Summary
::ActionCable::SubscriptionAdapter::SubscriberMap::Async - Inherited
::ActionCable::SubscriptionAdapter::SubscriberMap - Inherited
Constructor Details
.new(adapter, executor) ⇒ Listener
# File 'actioncable/lib/action_cable/subscription_adapter/postgresql.rb', line 82
def initialize(adapter, executor) super(executor) @adapter = adapter @queue = Queue.new @thread = Thread.new do Thread.current.abort_on_exception = true listen end end
Instance Method Details
#add_channel(channel, on_success)
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/postgresql.rb', line 125
def add_channel(channel, on_success) @queue.push([:listen, channel, on_success]) end
#listen
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/postgresql.rb', line 94
def listen @adapter.with_subscriptions_connection do |pg_conn| catch :shutdown do loop do until @queue.empty? action, channel, callback = @queue.pop(true) case action when :listen pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}") @executor.post(&callback) if callback when :unlisten pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}") when :shutdown throw :shutdown end end pg_conn.wait_for_notify(1) do |chan, pid, | broadcast(chan, ) end end end end end
#remove_channel(channel)
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/postgresql.rb', line 129
def remove_channel(channel) @queue.push([:unlisten, channel]) end
#shutdown
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/postgresql.rb', line 120
def shutdown @queue.push([:shutdown]) Thread.pass while @thread.alive? end