123456789_123456789_123456789_123456789_123456789_

Module: ActiveJob::Continuable

Relationships & Source Files
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
Instance Chain:
Defined in: activejob/lib/active_job/continuable.rb

Overview

The Continuable module provides the ability to track the progress of your jobs, and continue from where they left off if interrupted.

Mix Continuable into your job to enable continuations.

See ActiveJob::Continuation for usage.

Constant Summary

::ActiveModel::AttributeMethods - Included

CALL_COMPILABLE_REGEXP, NAME_COMPILABLE_REGEXP

Class Method Summary

::ActiveSupport::Concern - Extended

class_methods

Define class methods from given block.

included

Evaluate given block in context of base class, so that you can write class macros here.

prepended

Evaluate given block in context of base class, so that you can write class macros here.

append_features, prepend_features

Instance Attribute Summary

Instance Method Summary

Attributes - Included

::ActiveModel::Attributes - Included

#attribute_names

Returns an array of attribute names as strings.

#attributes

Returns a hash of all the attributes with their names as keys and the values of the attributes as values.

#_write_attribute, #attribute,
#attribute=
#freeze, #initialize, #initialize_dup

::ActiveModel::AttributeMethods - Included

#attribute_missing

attribute_missing is like method_missing, but for attributes.

#method_missing

Allows access to the object attributes, which are held in the hash returned by attributes, as though they were first-class methods.

#respond_to?,
#respond_to_without_attributes?

A Person instance with a name attribute can ask person.respond_to?(:name), person.respond_to?(:name=), and person.respond_to?(:name?) which will all return true.

#_read_attribute, #attribute_method?,
#matched_attribute_method

Returns a struct representing the matching attribute method.

#missing_attribute

Dynamic Method Handling

This class handles dynamic methods through the method_missing method in the class ActiveModel::AttributeMethods

DSL Calls

included

[ GitHub ]


17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'activejob/lib/active_job/continuable.rb', line 17

included do
  class_attribute :max_resumptions, instance_writer: false
  class_attribute :resume_options, instance_writer: false, default: { wait: 5.seconds }
  class_attribute :resume_errors_after_advancing, instance_writer: false, default: true

  around_perform :continue

  def initialize(...)
    super(...)
    self.resumptions = 0
    self.continuation = Continuation.new(self, {})
  end
end

Instance Attribute Details

#continuation (rw)

This method is for internal use only.
[ GitHub ]

  
# File 'activejob/lib/active_job/continuable.rb', line 34

attr_accessor :continuation # :nodoc:

#resumptions (rw)

The number of times the job has been resumed.

[ GitHub ]

  
# File 'activejob/lib/active_job/continuable.rb', line 32

attr_accessor :resumptions

Instance Method Details

#checkpoint!

This method is for internal use only.
[ GitHub ]

  
# File 'activejob/lib/active_job/continuable.rb', line 63

def checkpoint! # :nodoc:
  interrupt!(reason: :stopping) if queue_adapter.stopping?
end

#continue(&block) (private)

[ GitHub ]

  
# File 'activejob/lib/active_job/continuable.rb', line 73

def continue(&block)
  if continuation.started?
    self.resumptions += 1
    instrument :resume, **continuation.instrumentation
  end

  block.call
rescue Continuation::Interrupt => e
  resume_job(e)
rescue Continuation::Error
  raise
rescue StandardError => e
  if resume_errors_after_advancing? && continuation.advanced?
    resume_job(exception: e)
  else
    raise
  end
end

#deserialize(job_data)

This method is for internal use only.
[ GitHub ]

  
# File 'activejob/lib/active_job/continuable.rb', line 57

def deserialize(job_data) # :nodoc:
  super
  self.continuation = Continuation.new(self, job_data.fetch("continuation", {}))
  self.resumptions = job_data.fetch("resumptions", 0)
end

#interrupt!(reason:)

This method is for internal use only.
[ GitHub ]

  
# File 'activejob/lib/active_job/continuable.rb', line 67

def interrupt!(reason:) # :nodoc:
  instrument :interrupt, reason: reason, **continuation.instrumentation
  raise Continuation::Interrupt, "Interrupted #{continuation.description} (#{reason})"
end

#resume_job(exception) (private)

This method is for internal use only.
[ GitHub ]

  
# File 'activejob/lib/active_job/continuable.rb', line 92

def resume_job(exception) # :nodoc:
  executions_for(exception)
  if max_resumptions.nil? || resumptions < max_resumptions
    retry_job(**self.resume_options)
  else
    raise Continuation::ResumeLimitError, "Job was resumed a maximum of #{max_resumptions} times"
  end
end

#serialize

This method is for internal use only.
[ GitHub ]

  
# File 'activejob/lib/active_job/continuable.rb', line 53

def serialize # :nodoc:
  super.merge("continuation" => continuation.to_h, "resumptions" => resumptions)
end

#step(step_name, start: nil, isolated: false, &block)

Start a new continuation step

[ GitHub ]

  
# File 'activejob/lib/active_job/continuable.rb', line 37

def step(step_name, start: nil, isolated: false, &block)
  unless block_given?
    step_method = method(step_name)

    raise ArgumentError, "Step method '#{step_name}' must accept 0 or 1 arguments" if step_method.arity > 1

    if step_method.parameters.any? { |type, name| type == :key || type == :keyreq }
      raise ArgumentError, "Step method '#{step_name}' must not accept keyword arguments"
    end

    block = step_method.arity == 0 ? -> (_) { step_method.call } : step_method
  end
  checkpoint! if continuation.advanced?
  continuation.step(step_name, start: start, isolated: isolated, &block)
end