123456789_123456789_123456789_123456789_123456789_

Class: ActiveJob::QueueAdapters::AsyncAdapter::Scheduler

Do not use. This class is for internal use only.
Relationships & Source Files
Inherits: Object
Defined in: activejob/lib/active_job/queue_adapters/async_adapter.rb

Constant Summary

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Constructor Details

.new(**options) ⇒ Scheduler

[ GitHub ]

  
# File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 86

def initialize(**options)
  self.immediate = false
  @immediate_executor = Concurrent::ImmediateExecutor.new
  @async_executor = Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS.merge(options))
end

Instance Attribute Details

#immediate (rw)

[ GitHub ]

  
# File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 84

attr_accessor :immediate

Instance Method Details

#enqueue(job, queue_name:)

[ GitHub ]

  
# File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 92

def enqueue(job, queue_name:)
  executor.post(job, &:perform)
end

#enqueue_at(job, timestamp, queue_name:)

[ GitHub ]

  
# File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 96

def enqueue_at(job, timestamp, queue_name:)
  delay = timestamp - Time.current.to_f
  if !immediate && delay > 0
    Concurrent::ScheduledTask.execute(delay, args: [job], executor: executor, &:perform)
  else
    enqueue(job, queue_name: queue_name)
  end
end

#executor

[ GitHub ]

  
# File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 110

def executor
  immediate ? @immediate_executor : @async_executor
end

#shutdown(wait: true)

[ GitHub ]

  
# File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 105

def shutdown(wait: true)
  @async_executor.shutdown
  @async_executor.wait_for_termination if wait
end