Module: ActiveJob::Continuable
| Relationships & Source Files | |
| Super Chains via Extension / Inclusion / Inheritance | |
|
Class Chain:
self,
::ActiveSupport::Concern
|
|
|
Instance Chain:
|
|
| Defined in: | activejob/lib/active_job/continuable.rb |
Overview
The Continuable module provides the ability to track the progress of your jobs, and continue from where they left off if interrupted.
Mix Continuable into your job to enable continuations.
See ActiveJob::Continuation for usage.
Constant Summary
Class Method Summary
::ActiveSupport::Concern - Extended
| class_methods | Define class methods from given block. |
| included | Evaluate given block in context of base class, so that you can write class macros here. |
| prepended | Evaluate given block in context of base class, so that you can write class macros here. |
| append_features, prepend_features | |
Instance Attribute Summary
-
#resumptions
rw
The number of times the job has been resumed.
- #continuation rw Internal use only
::ActiveModel::Attributes - Included
Instance Method Summary
-
#step(step_name, start: nil, isolated: false, &block)
Start a new continuation step.
- #continue(&block) private
- #checkpoint! Internal use only
- #deserialize(job_data) Internal use only
- #interrupt!(reason:) Internal use only
- #serialize Internal use only
- #resume_job(exception) private Internal use only
Attributes - Included
::ActiveModel::Attributes - Included
| #attribute_names | Returns an array of attribute names as strings. |
| #attributes | Returns a hash of all the attributes with their names as keys and the values of the attributes as values. |
| #_write_attribute, #attribute, | |
| #attribute= | Alias for ActiveModel::Attributes#_write_attribute. |
| #freeze, #initialize, #initialize_dup | |
::ActiveModel::AttributeMethods - Included
| #attribute_missing |
|
| #method_missing | Allows access to the object attributes, which are held in the hash returned by |
| #respond_to?, | |
| #respond_to_without_attributes? | A |
| #_read_attribute, #attribute_method?, | |
| #matched_attribute_method | Returns a struct representing the matching attribute method. |
| #missing_attribute | |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method in the class ActiveModel::AttributeMethods
DSL Calls
included
[ GitHub ]17 18 19 20 21 22 23 24 25 26 27 28 29
# File 'activejob/lib/active_job/continuable.rb', line 17
included do class_attribute :max_resumptions, instance_writer: false class_attribute :, instance_writer: false, default: { wait: 5.seconds } class_attribute :resume_errors_after_advancing, instance_writer: false, default: true around_perform :continue def initialize(...) super(...) self.resumptions = 0 self.continuation = Continuation.new(self, {}) end end
Instance Attribute Details
#continuation (rw)
# File 'activejob/lib/active_job/continuable.rb', line 34
attr_accessor :continuation # :nodoc:
#resumptions (rw)
The number of times the job has been resumed.
# File 'activejob/lib/active_job/continuable.rb', line 32
attr_accessor :resumptions
Instance Method Details
#checkpoint!
# File 'activejob/lib/active_job/continuable.rb', line 63
def checkpoint! # :nodoc: interrupt!(reason: :stopping) if queue_adapter.stopping? end
#continue(&block) (private)
[ GitHub ]# File 'activejob/lib/active_job/continuable.rb', line 73
def continue(&block) if continuation.started? self.resumptions += 1 instrument :resume, **continuation.instrumentation end block.call rescue Continuation::Interrupt => e resume_job(e) rescue Continuation::Error raise rescue StandardError => e if resume_errors_after_advancing? && continuation.advanced? resume_job(exception: e) else raise end end
#deserialize(job_data)
# File 'activejob/lib/active_job/continuable.rb', line 57
def deserialize(job_data) # :nodoc: super self.continuation = Continuation.new(self, job_data.fetch("continuation", {})) self.resumptions = job_data.fetch("resumptions", 0) end
#interrupt!(reason:)
# File 'activejob/lib/active_job/continuable.rb', line 67
def interrupt!(reason:) # :nodoc: instrument :interrupt, reason: reason, **continuation.instrumentation raise Continuation::Interrupt, "Interrupted #{continuation.description} (#{reason})" end
#resume_job(exception) (private)
# File 'activejob/lib/active_job/continuable.rb', line 92
def resume_job(exception) # :nodoc: executions_for(exception) if max_resumptions.nil? || resumptions < max_resumptions retry_job(**self.) else raise Continuation::ResumeLimitError, "Job was resumed a maximum of #{max_resumptions} times" end end
#serialize
# File 'activejob/lib/active_job/continuable.rb', line 53
def serialize # :nodoc: super.merge("continuation" => continuation.to_h, "resumptions" => resumptions) end
#step(step_name, start: nil, isolated: false, &block)
Start a new continuation step
# File 'activejob/lib/active_job/continuable.rb', line 37
def step(step_name, start: nil, isolated: false, &block) unless block_given? step_method = method(step_name) raise ArgumentError, "Step method '#{step_name}' must accept 0 or 1 arguments" if step_method.arity > 1 if step_method.parameters.any? { |type, name| type == :key || type == :keyreq } raise ArgumentError, "Step method '#{step_name}' must not accept keyword arguments" end block = step_method.arity == 0 ? -> (_) { step_method.call } : step_method end checkpoint! if continuation.advanced? continuation.step(step_name, start: start, isolated: isolated, &block) end