123456789_123456789_123456789_123456789_123456789_

Class: Shell::ProcessController

Relationships & Source Files
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
self, Forwardable
Inherits: Object
Defined in: lib/shell/process-controller.rb

Constant Summary

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Constructor Details

.new(shell) ⇒ ProcessController

[ GitHub ]

  
# File 'lib/shell/process-controller.rb', line 94

def initialize(shell)
  @shell = shell
  @waiting_jobs = []
  @active_jobs = []
  @jobs_sync = Sync.new

  @job_monitor = Mutex.new
  @job_condition = ConditionVariable.new
end

Class Method Details

.activate(pc)

[ GitHub ]

  
# File 'lib/shell/process-controller.rb', line 39

def activate(pc)
  process_controllers_exclusive do
    @ProcessControllers[pc] ||= 0
    @ProcessControllers[pc] += 1
  end
end

.active_process_controllers

[ GitHub ]

  
# File 'lib/shell/process-controller.rb', line 33

def active_process_controllers
  process_controllers_exclusive do
    @ProcessControllers.dup
  end
end

.block_output_synchronize(&b)

[ GitHub ]

  
# File 'lib/shell/process-controller.rb', line 65

def block_output_synchronize(&b)
  @BlockOutputMonitor.synchronize(&b)
end

.each_active_object

[ GitHub ]

  
# File 'lib/shell/process-controller.rb', line 57

def each_active_object
  process_controllers_exclusive do
    for ref in @ProcessControllers.keys
      yield ref
    end
  end
end

.inactivate(pc)

[ GitHub ]

  
# File 'lib/shell/process-controller.rb', line 46

def inactivate(pc)
  process_controllers_exclusive do
    if @ProcessControllers[pc]
      if (@ProcessControllers[pc] -= 1) == 0
        @ProcessControllers.delete(pc)
        @ProcessControllersCV.signal
      end
    end
  end
end

.wait_to_finish_all_process_controllers

[ GitHub ]

  
# File 'lib/shell/process-controller.rb', line 69

def wait_to_finish_all_process_controllers
  process_controllers_exclusive do
    while !@ProcessControllers.empty?
      Shell::notify("Process finishing, but active shell exists",
                    "You can use Shell#transact or Shell#check_point for more safe execution.")
      if Shell.debug?
        for pc in @ProcessControllers.keys
          Shell::notify(" Not finished jobs in "+pc.shell.to_s)
          for com in pc.jobs
            com.notify("  Jobs: %id")
          end
        end
      end
      @ProcessControllersCV.wait(@ProcessControllersMonitor)
    end
  end
end

Instance Attribute Details

#active_jobs_exist?Boolean (readonly)

[ GitHub ]

  
# File 'lib/shell/process-controller.rb', line 129

def active_jobs_exist?
  @jobs_sync.synchronize(:SH) do
    @active_jobs.empty?
  end
end

#jobs_exist?Boolean (readonly)

[ GitHub ]

  
# File 'lib/shell/process-controller.rb', line 123

def jobs_exist?
  @jobs_sync.synchronize(:SH) do
    @active_jobs.empty? or @waiting_jobs.empty?
  end
end

#shell (readonly)

[ GitHub ]

  
# File 'lib/shell/process-controller.rb', line 104

attr_reader :shell

#waiting_jobs_exist?Boolean (readonly)

[ GitHub ]

  
# File 'lib/shell/process-controller.rb', line 135

def waiting_jobs_exist?
  @jobs_sync.synchronize(:SH) do
    @waiting_jobs.empty?
  end
end

Instance Method Details

#active_job?(job) ⇒ Boolean

[ GitHub ]

  
# File 'lib/shell/process-controller.rb', line 180

def active_job?(job)
  @jobs_sync.synchronize(:SH) do
    @active_jobs.include?(job)
  end
end

#active_jobs

[ GitHub ]

  
# File 'lib/shell/process-controller.rb', line 115

def active_jobs
  @active_jobs
end

#add_schedule(command)

schedule a command

[ GitHub ]

  
# File 'lib/shell/process-controller.rb', line 142

def add_schedule(command)
  @jobs_sync.synchronize(:EX) do
    ProcessController.activate(self)
    if @active_jobs.empty?
      start_job command
    else
      @waiting_jobs.push(command)
    end
  end
end

#jobs

[ GitHub ]

  
# File 'lib/shell/process-controller.rb', line 106

def jobs
  jobs = []
  @jobs_sync.synchronize(:SH) do
    jobs.concat @waiting_jobs
    jobs.concat @active_jobs
  end
  jobs
end

#kill_job(sig, command)

kill a job

[ GitHub ]

  
# File 'lib/shell/process-controller.rb', line 199

def kill_job(sig, command)
  @jobs_sync.synchronize(:EX) do
    if @waiting_jobs.delete command
      ProcessController.inactivate(self)
      return
    elsif @active_jobs.include?(command)
      begin
        r = command.kill(sig)
        ProcessController.inactivate(self)
      rescue
        print "Shell: Warn: $!\n" if @shell.verbose?
        return nil
      end
      @active_jobs.delete command
      r
    end
  end
end

#sfork(command)

simple fork

[ GitHub ]

  
# File 'lib/shell/process-controller.rb', line 235

def sfork(command)
  pipe_me_in, pipe_peer_out = IO.pipe
  pipe_peer_in, pipe_me_out = IO.pipe


  pid = nil
  pid_mutex = Mutex.new
  pid_cv = ConditionVariable.new

  Thread.start do
    ProcessController.block_output_synchronize do
      STDOUT.flush
      ProcessController.each_active_object do |pc|
        for jobs in pc.active_jobs
          jobs.flush
        end
      end

      pid = fork {
        Thread.list.each do |th|
          th.kill unless Thread.current == th
        end

        STDIN.reopen(pipe_peer_in)
        STDOUT.reopen(pipe_peer_out)

        ObjectSpace.each_object(IO) do |io|
          if ![STDIN, STDOUT, STDERR].include?(io)
            io.close unless io.closed?
          end
        end

        yield
      }
    end
    pid_cv.signal

    pipe_peer_in.close
    pipe_peer_out.close
    command.notify "job(%name:##{pid}) start", @shell.debug?

    begin
      _pid = nil
      command.notify("job(%id) start to waiting finish.", @shell.debug?)
      _pid = Process.waitpid(pid, nil)
    rescue Errno::ECHILD
      command.notify "warn: job(%id) was done already waitpid."
      _pid = true
    ensure
      command.notify("Job(%id): Wait to finish when Process finished.", @shell.debug?)
      # when the process ends, wait until the command terminates
      if USING_AT_EXIT_WHEN_PROCESS_EXIT or _pid
      else
        command.notify("notice: Process finishing...",
                       "wait for Job[%id] to finish.",
                       "You can use Shell#transact or Shell#check_point for more safe execution.")
        redo
      end

      @job_monitor.synchronize do
        terminate_job(command)
        @job_condition.signal
        command.notify "job(%id) finish.", @shell.debug?
      end
    end
  end

  pid_mutex.synchronize do
    while !pid
      pid_cv.wait(pid_mutex)
    end
  end

  return pid, pipe_me_in, pipe_me_out
end

#start_job(command = nil)

start a job

[ GitHub ]

  
# File 'lib/shell/process-controller.rb', line 154

def start_job(command = nil)
  @jobs_sync.synchronize(:EX) do
    if command
      return if command.active?
      @waiting_jobs.delete command
    else
      command = @waiting_jobs.shift

      return unless command
    end
    @active_jobs.push command
    command.start

    # start all jobs that input from the job
    for job in @waiting_jobs.dup
      start_job(job) if job.input == command
    end
  end
end

#terminate_job(command)

terminate a job

[ GitHub ]

  
# File 'lib/shell/process-controller.rb', line 187

def terminate_job(command)
  @jobs_sync.synchronize(:EX) do
    @active_jobs.delete command
    ProcessController.inactivate(self)
    if @active_jobs.empty?
      command.notify("start_job in terminate_job(%id)", Shell::debug?)
      start_job
    end
  end
end

#wait_all_jobs_execution

wait for all jobs to terminate

[ GitHub ]

  
# File 'lib/shell/process-controller.rb', line 219

def wait_all_jobs_execution
  @job_monitor.synchronize do
    begin
      while !jobs.empty?
        @job_condition.wait(@job_monitor)
        for job in jobs
          job.notify("waiting job(%id)", Shell::debug?)
        end
      end
    ensure
      redo unless jobs.empty?
    end
  end
end

#waiting_job?(job) ⇒ Boolean

[ GitHub ]

  
# File 'lib/shell/process-controller.rb', line 174

def waiting_job?(job)
  @jobs_sync.synchronize(:SH) do
    @waiting_jobs.include?(job)
  end
end

#waiting_jobs

[ GitHub ]

  
# File 'lib/shell/process-controller.rb', line 119

def waiting_jobs
  @waiting_jobs
end