123456789_123456789_123456789_123456789_123456789_

Class: ActiveJob::QueueAdapters::AsyncAdapter

Relationships & Source Files
Namespace Children
Classes:
Inherits: Object
Defined in: activejob/lib/active_job/queue_adapters/async_adapter.rb

Overview

Active Job Async adapter

The Async adapter runs jobs with an in-process thread pool.

This is the default queue adapter. It’s well-suited for dev/test since it doesn’t need an external infrastructure, but it’s a poor fit for production since it drops pending jobs on restart.

To use this adapter, set queue adapter to :async:

config.active_job.queue_adapter = :async

To configure the adapter’s thread pool, instantiate the adapter and pass your own config:

config.active_job.queue_adapter = ActiveJob::QueueAdapters::AsyncAdapter.new \
  min_threads: 1,
  max_threads: 2 * Concurrent.processor_count,
  idletime: 600.seconds

The adapter uses a Concurrent Ruby thread pool to schedule and execute jobs. Since jobs share a single thread pool, long-running jobs will block short-lived jobs. Fine for dev/test; bad for production.

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Constructor Details

.new(**executor_options) ⇒ AsyncAdapter

See Concurrent::ThreadPoolExecutor for executor options.

[ GitHub ]

  
# File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 35

def initialize(**executor_options)
  @scheduler = Scheduler.new(**executor_options)
end

Instance Attribute Details

#immediate=(immediate) (writeonly)

This method is for internal use only.

Used for our test suite.

[ GitHub ]

  
# File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 55

def immediate=(immediate) # :nodoc:
  @scheduler.immediate = immediate
end

Instance Method Details

#enqueue(job)

This method is for internal use only.
[ GitHub ]

  
# File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 39

def enqueue(job) # :nodoc:
  @scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_name
end

#enqueue_at(job, timestamp)

This method is for internal use only.
[ GitHub ]

  
# File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 43

def enqueue_at(job, timestamp) # :nodoc:
  @scheduler.enqueue_at JobWrapper.new(job), timestamp, queue_name: job.queue_name
end

#shutdown(wait: true)

This method is for internal use only.

Gracefully stop processing jobs. Finishes in-progress work and handles any new jobs following the executor’s fallback policy (caller_runs). Waits for termination by default. Pass wait: false to continue.

[ GitHub ]

  
# File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 50

def shutdown(wait: true) # :nodoc:
  @scheduler.shutdown wait: wait
end