Module: ActiveJob::Continuable
Relationships & Source Files | |
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
self,
::ActiveSupport::Concern
|
|
Defined in: | activejob/lib/active_job/continuable.rb |
Overview
The Continuable module provides the ability to track the progress of your jobs, and continue from where they left off if interrupted.
Mix Continuable
into your job to enable continuations.
See ActiveJob::Continuation for usage.
Class Method Summary
::ActiveSupport::Concern
- Extended
class_methods | Define class methods from given block. |
included | Evaluate given block in context of base class, so that you can write class macros here. |
prepended | Evaluate given block in context of base class, so that you can write class macros here. |
append_features, prepend_features |
Instance Attribute Summary
-
#resumptions
rw
The number of times the job has been resumed.
- #continuation rw Internal use only
Instance Method Summary
-
#step(step_name, start: nil, &block)
Start a new continuation step.
- #continue(&block) private
- #checkpoint! Internal use only
- #deserialize(job_data) Internal use only
- #serialize Internal use only
- #interrupt! private Internal use only
- #resume_job(exception) private Internal use only
DSL Calls
included
[ GitHub ]16 17 18 19 20 21 22 23 24 25 26 27 28
# File 'activejob/lib/active_job/continuable.rb', line 16
included do class_attribute :max_resumptions, instance_writer: false class_attribute :, instance_writer: false, default: { wait: 5.seconds } class_attribute :resume_errors_after_advancing, instance_writer: false, default: true around_perform :continue def initialize(...) super(...) self.resumptions = 0 self.continuation = Continuation.new(self, {}) end end
Instance Attribute Details
#continuation (rw)
This method is for internal use only.
[ GitHub ]
# File 'activejob/lib/active_job/continuable.rb', line 33
attr_accessor :continuation # :nodoc:
#resumptions (rw)
The number of times the job has been resumed.
# File 'activejob/lib/active_job/continuable.rb', line 31
attr_accessor :resumptions
Instance Method Details
#checkpoint!
This method is for internal use only.
[ GitHub ]
# File 'activejob/lib/active_job/continuable.rb', line 62
def checkpoint! # :nodoc: interrupt! if queue_adapter.stopping? end
#continue(&block) (private)
[ GitHub ]# File 'activejob/lib/active_job/continuable.rb', line 67
def continue(&block) if continuation.started? self.resumptions += 1 instrument :resume, **continuation.instrumentation end block.call rescue Continuation::Interrupt => e resume_job(e) rescue Continuation::Error raise rescue StandardError => e if resume_errors_after_advancing? && continuation.advanced? resume_job(exception: e) else raise end end
#deserialize(job_data)
This method is for internal use only.
[ GitHub ]
# File 'activejob/lib/active_job/continuable.rb', line 56
def deserialize(job_data) # :nodoc: super self.continuation = Continuation.new(self, job_data.fetch("continuation", {})) self.resumptions = job_data.fetch("resumptions", 0) end
#interrupt! (private)
This method is for internal use only.
# File 'activejob/lib/active_job/continuable.rb', line 95
def interrupt! # :nodoc: instrument :interrupt, **continuation.instrumentation raise Continuation::Interrupt, "Interrupted #{continuation.description}" end
#resume_job(exception) (private)
This method is for internal use only.
[ GitHub ]
# File 'activejob/lib/active_job/continuable.rb', line 86
def resume_job(exception) # :nodoc: executions_for(exception) if max_resumptions.nil? || resumptions < max_resumptions retry_job(**self. ) else raise Continuation::ResumeLimitError, "Job was resumed a maximum of #{max_resumptions} times" end end
#serialize
This method is for internal use only.
[ GitHub ]
# File 'activejob/lib/active_job/continuable.rb', line 52
def serialize # :nodoc: super.merge("continuation" => continuation.to_h, "resumptions" => resumptions) end
#step(step_name, start: nil, &block)
Start a new continuation step
# File 'activejob/lib/active_job/continuable.rb', line 36
def step(step_name, start: nil, &block) unless block_given? step_method = method(step_name) raise ArgumentError, "Step method '#{step_name}' must accept 0 or 1 arguments" if step_method.arity > 1 if step_method.parameters.any? { |type, name| type == :key || type == :keyreq } raise ArgumentError, "Step method '#{step_name}' must not accept keyword arguments" end block = step_method.arity == 0 ? -> (_) { step_method.call } : step_method end checkpoint! if continuation.advanced? continuation.step(step_name, start: start, &block) end