123456789_123456789_123456789_123456789_123456789_

Module: ActiveJob::Continuable

Relationships & Source Files
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
Defined in: activejob/lib/active_job/continuable.rb

Overview

The Continuable module provides the ability to track the progress of your jobs, and continue from where they left off if interrupted.

Mix Continuable into your job to enable continuations.

See ActiveJob::Continuation for usage.

Class Method Summary

::ActiveSupport::Concern - Extended

class_methods

Define class methods from given block.

included

Evaluate given block in context of base class, so that you can write class macros here.

prepended

Evaluate given block in context of base class, so that you can write class macros here.

append_features, prepend_features

Instance Attribute Summary

Instance Method Summary

DSL Calls

included

[ GitHub ]


16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'activejob/lib/active_job/continuable.rb', line 16

included do
  class_attribute :max_resumptions, instance_writer: false
  class_attribute :resume_options, instance_writer: false, default: { wait: 5.seconds }
  class_attribute :resume_errors_after_advancing, instance_writer: false, default: true

  around_perform :continue

  def initialize(...)
    super(...)
    self.resumptions = 0
    self.continuation = Continuation.new(self, {})
  end
end

Instance Attribute Details

#continuation (rw)

This method is for internal use only.
[ GitHub ]

  
# File 'activejob/lib/active_job/continuable.rb', line 33

attr_accessor :continuation # :nodoc:

#resumptions (rw)

The number of times the job has been resumed.

[ GitHub ]

  
# File 'activejob/lib/active_job/continuable.rb', line 31

attr_accessor :resumptions

Instance Method Details

#checkpoint!

This method is for internal use only.
[ GitHub ]

  
# File 'activejob/lib/active_job/continuable.rb', line 62

def checkpoint! # :nodoc:
  interrupt! if queue_adapter.stopping?
end

#continue(&block) (private)

[ GitHub ]

  
# File 'activejob/lib/active_job/continuable.rb', line 67

def continue(&block)
  if continuation.started?
    self.resumptions += 1
    instrument :resume, **continuation.instrumentation
  end

  block.call
rescue Continuation::Interrupt => e
  resume_job(e)
rescue Continuation::Error
  raise
rescue StandardError => e
  if resume_errors_after_advancing? && continuation.advanced?
    resume_job(exception: e)
  else
    raise
  end
end

#deserialize(job_data)

This method is for internal use only.
[ GitHub ]

  
# File 'activejob/lib/active_job/continuable.rb', line 56

def deserialize(job_data) # :nodoc:
  super
  self.continuation = Continuation.new(self, job_data.fetch("continuation", {}))
  self.resumptions = job_data.fetch("resumptions", 0)
end

#interrupt! (private)

This method is for internal use only.
[ GitHub ]

  
# File 'activejob/lib/active_job/continuable.rb', line 95

def interrupt! # :nodoc:
  instrument :interrupt, **continuation.instrumentation
  raise Continuation::Interrupt, "Interrupted #{continuation.description}"
end

#resume_job(exception) (private)

This method is for internal use only.
[ GitHub ]

  
# File 'activejob/lib/active_job/continuable.rb', line 86

def resume_job(exception) # :nodoc:
  executions_for(exception)
  if max_resumptions.nil? || resumptions < max_resumptions
    retry_job(**self.resume_options)
  else
    raise Continuation::ResumeLimitError, "Job was resumed a maximum of #{max_resumptions} times"
  end
end

#serialize

This method is for internal use only.
[ GitHub ]

  
# File 'activejob/lib/active_job/continuable.rb', line 52

def serialize # :nodoc:
  super.merge("continuation" => continuation.to_h, "resumptions" => resumptions)
end

#step(step_name, start: nil, &block)

Start a new continuation step

[ GitHub ]

  
# File 'activejob/lib/active_job/continuable.rb', line 36

def step(step_name, start: nil, &block)
  unless block_given?
    step_method = method(step_name)

    raise ArgumentError, "Step method '#{step_name}' must accept 0 or 1 arguments" if step_method.arity > 1

    if step_method.parameters.any? { |type, name| type == :key || type == :keyreq }
      raise ArgumentError, "Step method '#{step_name}' must not accept keyword arguments"
    end

    block = step_method.arity == 0 ? -> (_) { step_method.call } : step_method
  end
  checkpoint! if continuation.advanced?
  continuation.step(step_name, start: start, &block)
end