123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::SerializedExecution

Relationships & Source Files
Namespace Children
Classes:
Job
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
Instance Chain:
Inherits: Concurrent::Synchronization::LockableObject
Defined in: lib/concurrent-ruby/concurrent/executor/serialized_execution.rb

Overview

Ensures passed jobs in a serialized order never running at the same time.

Constant Summary

Concern::Logging - Included

SEV_LABEL

Class Method Summary

Instance Method Summary

Concern::Logging - Included

#log

Logs through global_logger, it can be overridden by setting @logger.

Synchronization::LockableObject - Inherited

Constructor Details

.newSerializedExecution

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/serialized_execution.rb', line 11

def initialize()
  super()
  synchronize { ns_initialize }
end

Instance Method Details

#call_job(job) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/serialized_execution.rb', line 75

def call_job(job)
  did_it_run = begin
                 job.executor.post { work(job) }
                 true
               rescue RejectedExecutionError => ex
                 false
               end

  # TODO not the best idea to run it myself
  unless did_it_run
    begin
      work job
    rescue => ex
      # let it fail
      log DEBUG, ex
    end
  end
end

#ns_initialize (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/serialized_execution.rb', line 70

def ns_initialize
  @being_executed = false
  @stash          = []
end

#post(executor, *args) { ... } ⇒ Boolean

Submit a task to the executor for asynchronous processing.

Parameters:

  • executor (Executor)

    to be used for this job

  • args (Array)

    zero or more arguments to be passed to the task

Yields:

  • the asynchronous task to perform

Returns:

  • (Boolean)

    true if the task is queued, false if the executor is not running

Raises:

  • (ArgumentError)

    if no task is given

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/serialized_execution.rb', line 34

def post(executor, *args, &task)
  posts [[executor, args, task]]
  true
end

#posts(posts)

As #post but allows to submit multiple tasks at once, it’s guaranteed that they will not be interleaved by other tasks.

Parameters:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/serialized_execution.rb', line 44

def posts(posts)
  # if can_overflow?
  #   raise ArgumentError, 'SerializedExecution does not support thread-pools which can overflow'
  # end

  return nil if posts.empty?

  jobs = posts.map { |executor, args, task| Job.new executor, args, task }

  job_to_post = synchronize do
    if @being_executed
      @stash.push(*jobs)
      nil
    else
      @being_executed = true
      @stash.push(*jobs[1..-1])
      jobs.first
    end
  end

  call_job job_to_post if job_to_post
  true
end

#work(job) (private)

ensures next job is executed if any is stashed

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/serialized_execution.rb', line 95

def work(job)
  job.call
ensure
  synchronize do
    job = @stash.shift || (@being_executed = false)
  end

  # TODO maybe be able to tell caching pool to just enqueue this job, because the current one end at the end
  # of this block
  call_job job if job
end