123456789_123456789_123456789_123456789_123456789_

Class: ActiveJob::Continuation

Relationships & Source Files
Namespace Children
Modules:
Classes:
Exceptions:
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
Inherits: Object
Defined in: activejob/lib/active_job/continuation.rb,
activejob/lib/active_job/continuation/step.rb,
activejob/lib/active_job/continuation/test_helper.rb

Overview

Continuations provide a mechanism for interrupting and resuming jobs. This allows long-running jobs to make progress across application restarts.

Jobs should include the Continuable module to enable continuations. Continuable jobs are automatically retried when interrupted.

Use the #step method to define the steps in your job. Steps can use an optional cursor to track progress in the step.

Steps are executed as soon as they are encountered. If a job is interrupted, previously completed steps will be skipped. If a step is in progress, it will be resumed with the last recorded cursor.

Code that is not part of a step will be executed on each job run.

You can pass a block or a method name to the step method. The block will be called with the step object as an argument. Methods can either take no arguments or a single argument for the step object.

class ProcessImportJob < ApplicationJob
  include ActiveJob::Continuable

  def perform(import_id)
    # This always runs, even if the job is resumed.
    @import = Import.find(import_id)

    step :validate do
      @import.validate!
    end

    step(:process_records) do |step|
      @import.records.find_each(start: step.cursor)
        record.process
        step.advance! from: record.id
      end
    end

    step :reprocess_records
    step :finalize
  end

  def reprocess_records(step)
    @import.records.find_each(start: step.cursor)
      record.reprocess
      step.advance! from: record.id
    end
  end

  def finalize
    @import.finalize!
  end
end

Cursors

Cursors are used to track progress within a step. The cursor can be any object that is serializable as an argument to ActiveJob::Base.serialize. It defaults to nil.

When a step is resumed, the last cursor value is restored. The code in the step is responsible for using the cursor to continue from the right point.

set! sets the cursor to a specific value.

step :iterate_items do |step|
  items[step.cursor..].each do |item|
    process(item)
    step.set! (step.cursor || 0) + 1
  end
end

An starting value for the cursor can be set when defining the step:

step :iterate_items, start: 0 do |step|
  items[step.cursor..].each do |item|
    process(item)
    step.set! step.cursor + 1
  end
end

The cursor can be advanced with advance!. This calls succ on the current cursor value. It raises an UnadvanceableCursorError if the cursor does not implement succ.

step :iterate_items, start: 0 do |step|
  items[step.cursor..].each do |item|
    process(item)
    step.advance!
  end
end

You can optionally pass a from argument to advance!. This is useful when iterating over a collection of records where IDs may not be contiguous.

step :process_records do |step|
  import.records.find_each(start: step.cursor)
    record.process
    step.advance! from: record.id
  end
end

You can use an array to iterate over nested records:

step :process_nested_records, start: [ 0, 0 ] do |step|
  Account.find_each(start: step.cursor[0]) do ||
    .records.find_each(start: step.cursor[1]) do |record|
      record.process
      step.set! [ .id, record.id + 1 ]
    end
    step.set! [ .id + 1, 0 ]
  end
end

Setting or advancing the cursor creates a checkpoint. You can also create a checkpoint manually by calling the #checkpoint! method on the step. This is useful if you want to allow interruptions, but don’t need to update the cursor.

step :destroy_records do |step|
  import.records.find_each do |record|
    record.destroy!
    step.checkpoint!
  end
end

Checkpoints

A checkpoint is where a job can be interrupted. At a checkpoint the job will call queue_adapter.stopping?. If it returns true, the job will raise an Interrupt exception.

There is an automatic checkpoint at the end of each step. Within a step one is created when calling set!, advance! or #checkpoint!.

Jobs are not automatically interrupted when the queue adapter is marked as stopping - they will continue to run either until the next checkpoint, or when the process is stopped.

This is to allow jobs to be interrupted at a safe point, but it also means that the jobs should checkpoint more frequently than the shutdown timeout to ensure a graceful restart.

When interrupted, the job will automatically retry with the progress serialized in the job data under the continuation key.

The serialized progress contains:

  • a list of the completed steps

  • the current step and its cursor value (if one is in progress)

Errors

If a job raises an error and is not retried via Active Job, it will be passed back to the underlying queue backend and any progress in this execution will be lost.

To mitigate this, the job will be automatically retried if it raises an error after it has made progress. Making progress is defined as having completed a step or advanced the cursor within the current step.

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Constructor Details

.new(job, serialized_progress) ⇒ Continuation

[ GitHub ]

  
# File 'activejob/lib/active_job/continuation.rb', line 186

def initialize(job, serialized_progress)
  @job = job
  @completed = serialized_progress.fetch("completed", []).map(&:to_sym)
  @current = new_step(*serialized_progress["current"], resumed: true) if serialized_progress.key?("current")
  @encountered_step_names = []
  @advanced = false
  @running_step = false
end

Instance Attribute Details

#advanced?Boolean (readonly, private)

[ GitHub ]

  
# File 'activejob/lib/active_job/continuation.rb', line 232

def advanced?
  @advanced
end

#completed?(name) ⇒ Boolean (readonly, private)

[ GitHub ]

  
# File 'activejob/lib/active_job/continuation.rb', line 244

def completed?(name)
  completed.include?(name)
end

#current (readonly, private)

[ GitHub ]

  
# File 'activejob/lib/active_job/continuation.rb', line 230

attr_reader :job, :encountered_step_names, :completed, :current

#encountered_step_names (readonly, private)

[ GitHub ]

  
# File 'activejob/lib/active_job/continuation.rb', line 230

attr_reader :job, :encountered_step_names, :completed, :current

#job (readonly, private)

[ GitHub ]

  
# File 'activejob/lib/active_job/continuation.rb', line 230

attr_reader :job, :encountered_step_names, :completed, :current

#running_step?Boolean (readonly, private)

[ GitHub ]

  
# File 'activejob/lib/active_job/continuation.rb', line 236

def running_step?
  @running_step
end

#started?Boolean (readonly, private)

[ GitHub ]

  
# File 'activejob/lib/active_job/continuation.rb', line 240

def started?
  completed.any? || current.present?
end

Instance Method Details

#checkpoint! (private)

[ GitHub ]

  
# File 'activejob/lib/active_job/continuation.rb', line 288

def checkpoint!
  interrupt! if job.queue_adapter.stopping?
end

#completed (readonly, private)

[ GitHub ]

  
# File 'activejob/lib/active_job/continuation.rb', line 230

attr_reader :job, :encountered_step_names, :completed, :current

#continue(&block)

[ GitHub ]

  
# File 'activejob/lib/active_job/continuation.rb', line 195

def continue(&block)
  wrapping_errors_after_advancing do
    instrument_job :resume if started?
    block.call
  end
end

#description

[ GitHub ]

  
# File 'activejob/lib/active_job/continuation.rb', line 219

def description
  if current
    current.description
  elsif completed.any?
    "after '#{completed.last}'"
  else
    "not started"
  end
end

#instrument(event, payload = {}) (private)

[ GitHub ]

  
# File 'activejob/lib/active_job/continuation.rb', line 317

def instrument(event, payload = {})
  job.send(:instrument, event, **payload)
end

#instrument_job(event) (private)

[ GitHub ]

  
# File 'activejob/lib/active_job/continuation.rb', line 313

def instrument_job(event)
  instrument event, description: description, completed_steps: completed, current_step: current
end

#instrumenting_step(step, &block) (private)

[ GitHub ]

  
# File 'activejob/lib/active_job/continuation.rb', line 302

def instrumenting_step(step, &block)
  instrument (step.resumed? ? :step_resumed : :step_started), step: step

  block.call

  instrument :step_completed, step: step
rescue Interrupt
  instrument :step_interrupted, step: step
  raise
end

#interrupt! (private)

Raises:

[ GitHub ]

  
# File 'activejob/lib/active_job/continuation.rb', line 283

def interrupt!
  instrument_job :interrupt
  raise Interrupt, "Interrupted #{description}"
end

#new_step(*args, **options) (private)

[ GitHub ]

  
# File 'activejob/lib/active_job/continuation.rb', line 257

def new_step(*args, **options)
  Step.new(*args, **options) { checkpoint! }
end

#run_step(name, start:, &block) (private)

[ GitHub ]

  
# File 'activejob/lib/active_job/continuation.rb', line 265

def run_step(name, start:, &block)
  @running_step = true
  @current ||= new_step(name, start, resumed: false)

  instrumenting_step(current) do
    block.call(current)
  end

  @completed << current.name
  @current = nil
  @advanced = true

  checkpoint!
ensure
  @running_step = false
  @advanced ||= current&.advanced?
end

#skip_step(name) (private)

[ GitHub ]

  
# File 'activejob/lib/active_job/continuation.rb', line 261

def skip_step(name)
  instrument :step_skipped, step: name
end

#step(name, start:, &block)

[ GitHub ]

  
# File 'activejob/lib/active_job/continuation.rb', line 202

def step(name, start:, &block)
  validate_step!(name)

  if completed?(name)
    skip_step(name)
  else
    run_step(name, start: start, &block)
  end
end

#to_h

[ GitHub ]

  
# File 'activejob/lib/active_job/continuation.rb', line 212

def to_h
  {
    "completed" => completed.map(&:to_s),
    "current" => current&.to_a
  }.compact
end

#validate_step!(name) (private)

Raises:

[ GitHub ]

  
# File 'activejob/lib/active_job/continuation.rb', line 248

def validate_step!(name)
  raise InvalidStepError, "Step '#{name}' must be a Symbol, found '#{name.class}'" unless name.is_a?(Symbol)
  raise InvalidStepError, "Step '#{name}' has already been encountered" if encountered_step_names.include?(name)
  raise InvalidStepError, "Step '#{name}' is nested inside step '#{current.name}'" if running_step?
  raise InvalidStepError, "Step '#{name}' found, expected to resume from '#{current.name}'" if current && current.name != name && !completed?(name)

  encountered_step_names << name
end

#wrapping_errors_after_advancing(&block) (private)

[ GitHub ]

  
# File 'activejob/lib/active_job/continuation.rb', line 292

def wrapping_errors_after_advancing(&block)
  block.call
rescue StandardError => e
  if !e.is_a?(Error) && advanced?
    raise AfterAdvancingError, "Advanced job failed with error: #{e.message}"
  else
    raise
  end
end